Skip to content

Instantly share code, notes, and snippets.

Created April 30, 2012 16:21
Show Gist options
  • Select an option

  • Save anonymous/2559738 to your computer and use it in GitHub Desktop.

Select an option

Save anonymous/2559738 to your computer and use it in GitHub Desktop.
Worker thingy for Dave
<?php
/**
Realised my tweet was unclear: what I meant is:
1) Close of the socket and context in the child process after forking before creating the child context/socket - you need both bits so that it shuts down, else the context will wait for the socket
2) Turn off 'persistance' with a flag, else it'll take it out of the global table internal to PHP (which is kept when forking of course).
Hacked in changed below:
*/
$w = new Worker;
$w->run();
class Worker
{
/**
* Number of processes in pool
*
* @var integer
*/
private $poolSize = 5;
/**
* Parent PID
*
* @var integer
*/
private $parentPid;
private $context;
private $receiver;
/**
* Workers in pool - the ones that actually do the work
*
* @var array
*/
private $children = array();
public function __construct()
{
$this->parentPid = getmypid();
pcntl_signal(SIGCHLD, array($this, 'childSignalHandler'));
}
public function run()
{
// fire up the parent socket - as REP - where we'll be listening
// for messages from children
$this->context = new ZMQContext(1,0);
$this->receiver = new ZMQSocket($this->context, ZMQ::SOCKET_REP);
$this->receiver->bind("ipc:///tmp/worker-{$this->parentPid}");
echo "PARENT\tbinding to ipc:///tmp/worker-{$this->parentPid}\n";
$poll = new ZMQPoll();
$poll->add($this->receiver, ZMQ::POLL_IN);
$readable = $writeable = array();
while (TRUE) {
if (count($this->children) < $this->poolSize) {
// @todo throttle how often we can spawn
$this->launchWorker();
}
// these are just here so we can see at STDOUT what's going on
echo "PARENT\tLooping...\n";
sleep(1);
$events = $poll->poll($readable, $writeable, 100);
foreach ($readable as $socket) {
if ($socket === $this->receiver) {
$msg = $socket->recv();
echo "PARENT\treceived $msg\n";
$socket->send("My reply");
}
}
}
}
/**
* Handle signal to child
*
* @param integer|NULL $sigNo The signal number
* @param integer|NULL $pid The process ID
* @param type $status
*
* @return boolean
*/
public function childSignalHandler($sigNo, $pid = NULL, $status = NULL)
{
// if no pid is provided, that means we're getting the signal
// from the system. Let's figure out which child process ended
if (!$pid) {
$pid = pcntl_waitpid(-1, $status, WNOHANG);
}
// make sure we get all of the exited children
while ($pid > 0) {
if ($pid && isset($this->children[$pid])) {
$exitCode = pcntl_wexitstatus($status);
if ($exitCode != 0) {
echo "PARENT\tChild $pid exited with status $exitCode\n";
}
unset($this->children[$pid]);
} elseif ($pid) {
// job has finished before this parent process could even
// note that it had been launched!
echo "PARENT\tadding $pid to signal queue\n";
$this->signalQueue[$pid] = $status;
}
$pid = pcntl_waitpid(-1, $status, WNOHANG);
}
return TRUE;
}
/**
* Launch worker
*/
private function launchWorker()
{
$pid = pcntl_fork();
if ($pid === -1) {
echo "PARENT\terror forking child\n";
} elseif ($pid) {
// PARENT
$this->children[$pid] = TRUE;
echo "PARENT\tspawned child $pid so now have " . count($this->children) . " workers active\n";
if (isset($this->signalQueue[$pid])){
echo "PARENT\tFound signal in queue, processing now...\n";
$this->childSignalHandler(SIGCHLD, $pid, $this->signalQueue[$pid]);
unset($this->signalQueue[$pid]);
}
} else {
// CHILD
echo "CHILD\tstarting up child\n";
$this->doWork();
exit(0);
}
}
/**
* Do some actual work
*/
private function doWork()
{
unset($this->receiver);
unset($this->context);
$this->context = new ZMQContext(1,0);
$socket = new ZMQSocket($this->context, ZMQ::SOCKET_REQ);
$socket->connect("ipc:///tmp/worker-{$this->parentPid}");
echo "CHILD\tConnecting to ipc:///tmp/worker-{$this->parentPid}\n";
$i=0;
while (TRUE) {
$i++;
echo "CHILD\tLooping...\n";
// just sticking the 5 thing in so you can see it looping a bit first
if ($i>5) {
$socket->send("Some message");
echo "CHILD\tSent a message...\n";
$back = $socket->recv();
echo "CHILD\tReceived back $back\n";
}
if(rand(1, 10) == 1) {
echo "CHILD\rdieing";
exit();
}
sleep(1);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment