-
-
Save davegardnerisme/2559860 to your computer and use it in GitHub Desktop.
Worker thingy for Dave
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| <?php | |
| /** | |
| Realised my tweet was unclear: what I meant is: | |
| 1) Close of the socket and context in the child process after forking before creating the child context/socket - you need both bits so that it shuts down, else the context will wait for the socket | |
| 2) Turn off 'persistance' with a flag, else it'll take it out of the global table internal to PHP (which is kept when forking of course). | |
| Hacked in changed below: | |
| */ | |
| $w = new Worker; | |
| $w->run(); | |
| class Worker | |
| { | |
| /** | |
| * Number of processes in pool | |
| * | |
| * @var integer | |
| */ | |
| private $poolSize = 5; | |
| /** | |
| * Parent PID | |
| * | |
| * @var integer | |
| */ | |
| private $parentPid; | |
| private $context; | |
| private $receiver; | |
| /** | |
| * Workers in pool - the ones that actually do the work | |
| * | |
| * @var array | |
| */ | |
| private $children = array(); | |
| public function __construct() | |
| { | |
| $this->parentPid = getmypid(); | |
| pcntl_signal(SIGCHLD, array($this, 'childSignalHandler')); | |
| } | |
| public function run() | |
| { | |
| // fire up the parent socket - as REP - where we'll be listening | |
| // for messages from children | |
| $this->context = new ZMQContext(1,0); | |
| $this->receiver = new ZMQSocket($this->context, ZMQ::SOCKET_REP); | |
| $this->receiver->bind("ipc:///tmp/worker-{$this->parentPid}"); | |
| echo "PARENT\tbinding to ipc:///tmp/worker-{$this->parentPid}\n"; | |
| $poll = new ZMQPoll(); | |
| $poll->add($this->receiver, ZMQ::POLL_IN); | |
| $readable = $writeable = array(); | |
| while (TRUE) { | |
| if (count($this->children) < $this->poolSize) { | |
| // @todo throttle how often we can spawn | |
| $this->launchWorker(); | |
| } | |
| // these are just here so we can see at STDOUT what's going on | |
| echo "PARENT\tLooping...\n"; | |
| sleep(1); | |
| $events = $poll->poll($readable, $writeable, 100); | |
| foreach ($readable as $socket) { | |
| if ($socket === $this->receiver) { | |
| $msg = $socket->recv(); | |
| echo "PARENT\treceived $msg\n"; | |
| $socket->send("My reply"); | |
| } | |
| } | |
| } | |
| } | |
| /** | |
| * Handle signal to child | |
| * | |
| * @param integer|NULL $sigNo The signal number | |
| * @param integer|NULL $pid The process ID | |
| * @param type $status | |
| * | |
| * @return boolean | |
| */ | |
| public function childSignalHandler($sigNo, $pid = NULL, $status = NULL) | |
| { | |
| // if no pid is provided, that means we're getting the signal | |
| // from the system. Let's figure out which child process ended | |
| if (!$pid) { | |
| $pid = pcntl_waitpid(-1, $status, WNOHANG); | |
| } | |
| // make sure we get all of the exited children | |
| while ($pid > 0) { | |
| if ($pid && isset($this->children[$pid])) { | |
| $exitCode = pcntl_wexitstatus($status); | |
| if ($exitCode != 0) { | |
| echo "PARENT\tChild $pid exited with status $exitCode\n"; | |
| } | |
| unset($this->children[$pid]); | |
| } elseif ($pid) { | |
| // job has finished before this parent process could even | |
| // note that it had been launched! | |
| echo "PARENT\tadding $pid to signal queue\n"; | |
| $this->signalQueue[$pid] = $status; | |
| } | |
| $pid = pcntl_waitpid(-1, $status, WNOHANG); | |
| } | |
| return TRUE; | |
| } | |
| /** | |
| * Launch worker | |
| */ | |
| private function launchWorker() | |
| { | |
| $pid = pcntl_fork(); | |
| if ($pid === -1) { | |
| echo "PARENT\terror forking child\n"; | |
| } elseif ($pid) { | |
| // PARENT | |
| $this->children[$pid] = TRUE; | |
| echo "PARENT\tspawned child $pid so now have " . count($this->children) . " workers active\n"; | |
| if (isset($this->signalQueue[$pid])){ | |
| echo "PARENT\tFound signal in queue, processing now...\n"; | |
| $this->childSignalHandler(SIGCHLD, $pid, $this->signalQueue[$pid]); | |
| unset($this->signalQueue[$pid]); | |
| } | |
| sleep(5); | |
| } else { | |
| // CHILD | |
| echo "CHILD\tstarting up child\n"; | |
| $this->doWork(); | |
| exit(0); | |
| } | |
| } | |
| /** | |
| * Do some actual work | |
| */ | |
| private function doWork() | |
| { | |
| unset($this->receiver); | |
| unset($this->context); | |
| $this->context = new ZMQContext(1,0); | |
| $socket = new ZMQSocket($this->context, ZMQ::SOCKET_REQ); | |
| $socket->connect("ipc:///tmp/worker-{$this->parentPid}"); | |
| echo "CHILD\tConnecting to ipc:///tmp/worker-{$this->parentPid}\n"; | |
| $i=0; | |
| while (TRUE) { | |
| $i++; | |
| echo "CHILD\tLooping...\n"; | |
| // just sticking the 5 thing in so you can see it looping a bit first | |
| if ($i>5) { | |
| $socket->send("Some message"); | |
| echo "CHILD\tSent a message...\n"; | |
| $back = $socket->recv(); | |
| echo "CHILD\tReceived back $back\n"; | |
| } | |
| if(rand(1, 10) == 1) { | |
| echo "CHILD\rdieing"; | |
| exit(); | |
| } | |
| sleep(1); | |
| } | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment