[nycphp-talk] IPC Problems
Ben Sgro (ProjectSkyline)
ben at projectskyline.com
Thu Apr 19 10:54:44 EDT 2007
Hi All,
I'm trying to write a script, that forks children to each run a job, provided by
the parent in the form of a job_id, passed by IPC msg_send( ).
Now, when I run the job, I get some problems w/the DB loosing the connection.
I also have problems w/the job_id that's get passed ..sometimes, somehow, its the
PID...I dont get it.
It seems like the IPC is all over the place ... I've included the output from the script,
the database tables, and the source of the script.
Any help is greatly appreaciated. I've read the php.net pcntl_and msg- (IPC) stuff over and over.
What the goal is:
1) Parent, selects the next job to run.
2) Marks job as active =1, stores the PID
3) Forks off a child to run this (job_id passed to child via msg_send( ))
4) Child runs, finishes job and marks as active = 0, incremenents run count, tells parent to kill them
5) Rinse lather repeat
- Ben
[sk at projectskyline clposer]$ php run_job.php
Connected to database clposer
Using database clposer
CHILD
PARENT
Look up job_id to run
SQL Execution Time(0.00062417984008789)
SELECT job_id FROM job_queue WHERE time < now( ) AND active = 0 ORDER BY time
SQL Execution Time(0.00042200088500977)
UPDATE job_queue SET active = 1, run_count = (run_count + 1), pid = 31306 WHERE job_id = 1
CHILD
PARENT
Look up job_id to run
Parent gave me 1 ID to run
SQL Execution Time(0.00090312957763672)
SELECT job_id FROM job_queue WHERE time < now( ) AND active = 0 ORDER BY time
SQL Execution Time(0.0003819465637207)
UPDATE job_queue SET active = 1, run_count = (run_count + 1), pid = 31441 WHERE job_id = 3
Could not successfully run query (UPDATE job_queue SET active = 0, time = now( ) WHERE job_id = 1) from DB: Lost connection to MySQL server during query[sk at projectskyline clposer]$ Parent gave me 3 ID to run
Could not successfully run query (UPDATE job_queue SET active = 0, time = now( ) WHERE job_id = 3) from DB: MySQL server has gone away
mysql> desc job_queue;
+-----------+------------+------+-----+---------------------+----------------+
| Field | Type | Null | Key | Default | Extra |
+-----------+------------+------+-----+---------------------+----------------+
| job_id | int(11) | NO | PRI | NULL | auto_increment |
| active | tinyint(1) | YES | | 0 | |
| failed | tinyint(1) | YES | | 0 | |
| run_count | int(11) | YES | | 0 | |
| time | datetime | YES | | 0000-00-00 00:00:00 | |
| pid | int(11) | YES | | 0 | |
+-----------+------------+------+-----+---------------------+----------------+
#!/usr/bin/php -q
<?php
/* *** WHEN PUSHING LIVE ****
Has to be manually changed
*/
$REMOTE = 0;
$OOP_INC_PATH = ( $REMOTE == 0 )? '../OOPLIB/' : 'OOPLIB/';
/* Keep include files this order. */
include($OOP_INC_PATH . 'CONS.php');
include($OOP_INC_PATH . 'DBAS.php');
include($OOP_INC_PATH . 'ERRO.php');
include($OOP_INC_PATH . 'UTIL.php');
include('constants.php');
$dbObject = new Database(LOG_LEVEL_NORMAL, $dbSet);
$MSG_QUEUE = msg_get_queue(ftok("tmp/php_msg_queue.stat", 'R'), 0666);
for ( $count = 0; $count < MAX_CHILD_PROCS; $count++ )
{
$pId = pcntl_fork( );
if ( $pId == -1 )
{
printf("Could not fork process");
exit;
}
elseif ( $pId ) /* We are the parent. */
{
echo "PARENT\n";
//printf("I am the parent %d\n", $count);
/* Select the next job to run. */
echo "Look up job_id to run\n";
$dbObject->DatabaseQuery('SELECT job_id FROM ' . DB_TABLE_JOB_QUEUE
. ' WHERE time < now( )'
. ' AND active = 0'
. ' ORDER BY time',
constReturnOne, LOG_LEVEL_NORMAL);
$jobIdToRun = $dbObject->result;
$jobIdToRun = $jobIdToRun['job_id'];
$currentQueue = msg_stat_queue($MSG_QUEUE);
/* Mark this job_id as active and increment the run_count. */
$dbObject->DatabaseQuery('UPDATE ' . DB_TABLE_JOB_QUEUE
. ' SET active = 1,'
. ' run_count = (run_count + 1),'
. ' pid = ' . $currentQueue['msg_lrpid']
. " WHERE job_id = $jobIdToRun",
constReturnNone, LOG_LEVEL_NORMAL);
/* Tell the child what job_id to run. */
msg_send($MSG_QUEUE, 1, $jobIdToRun, true, true, $errmsg);
$killCount = $currentQueue['msg_qnum'];
if ( $killCount > 0 )
{
for ($i = 0; $i < $killCount; $i++ )
{
if ( !msg_receive($MSG_QUEUE, 1, $msg_type, 16384, $msg, true, 0, $errmsg))
{
printf("Error:%s", $errmsg);
}
else
{
pcntl_waitpid($msg, $tmpStat, 0);
}
}
}
}
else /* We are the child. */
{
echo "CHILD\n";
if ( !posix_setsid( ) )
{
printf("Could not detch");
exit;
}
//printf("I am child %d\n", $count);
if ( msg_receive($MSG_QUEUE, 1, $msg_type, 16384, $jobIdToRun, true, 0, $msg_error ) )
{
/* Parent has sent us our job_id */
if ( $jobIdToRun != '' )
{
//$dbObject = new Database(LOG_LEVEL_NORMAL, $dbSet);
echo "Parent gave me $jobIdToRun ID to run\n";
/*
Lets say we ran the job here
*/
$dbObject->DatabaseQuery('UPDATE ' . DB_TABLE_JOB_QUEUE
. ' SET active = 0,'
. ' time = now( )'
. " WHERE job_id = $jobIdToRun", constReturnNone,
LOG_LEVEL_NORMAL);
}
//$jobIdToRun = 0;
/* Tell parent to kill this child. */
if ( !msg_send($MSG_QUEUE, 1, posix_getpid( ), true, true, $errmsg) )
{
printf("msg_send( ) %s", $errmsg);
}
exit(0);
}
}
}
?>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.nyphp.org/pipermail/talk/attachments/20070419/6a0cc9d9/attachment.html>
More information about the talk
mailing list