Created
October 4, 2013 09:39
-
-
Save J7mbo/6823472 to your computer and use it in GitHub Desktop.
Revisions
-
J7mbo created this gist
Oct 4, 2013 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,85 @@ /** * User subscribes with their subject:userId:token * * If token does not match userId (check DB), return a 403 else... * * Token is regenerated and placed in DB for next time the page is visited, * userId is retrieved that matches token. * * New process spawned which runs looping zmq with argv being userId from * previous DB check. Process Id retrieved. * * Add conn, subject, userId, processId, ip and new token to $clients array * * Print above details to console for debugging (or log all this) * * @param \Ratchet\ConnectionInterface $conn * @param type $topic */ public function onSubscribe(ConnectionInterface $conn, $subscription) { list($subject, $userId, $token) = explode(':', $subscription->getId()); $subscription->add($conn); $resourceId = $conn->resourceId; $pidArr = []; $cmd = sprintf('%s%s --userId=%d --subject=%s --resourceId=%s', self::CMD_PREFIX, self::CMD_TORRENTS, $userId, $subject, $resourceId); exec(sprintf("%s > /dev/null 2>&1 & echo $!;", $cmd), $pidArr); $pid = $pidArr[0]; if (empty($pidArr) || $pid == 0) { echo sprintf("Couldn't create a new process for user: %d", $userId); return; } if (!array_key_exists($resourceId, $this->clients)) { $this->clients[$resourceId]['userId'] = $userId; $this->clients[$resourceId]['token'] = $token; $this->clients[$resourceId]['conn'] = $conn; $this->clients[$resourceId]['subject'] = $subject; $this->clients[$resourceId]['subscription'] = $subscription; $this->clients[$resourceId]['pid'] = $pid; } echo sprintf("Client Connected: %s, userId: %d, subject: %s" . PHP_EOL, $conn->remoteAddress, $userId, $subject); } /** * @param string JSON'ified string we'll receive from ZeroMQ */ public function onMessage($data) { $data = json_decode($data, true); var_dump($data); $result = reset(array_filter($this->clients, function($client) use ($data) { return $client['conn']->resourceId == $data['resourceId']; })); if ($result) { $result['subscription']->broadcast($data); } else { echo "Couldn't find user in this->clients"; return; } } public function onClose(ConnectionInterface $conn) { $resourceId = $conn->resourceId; $result = reset(array_filter($this->clients, function($client) use ($resourceId) { return $client['conn']->resourceId == $resourceId; })); posix_kill($result['pid'], SIGKILL); unset($this->clients[$conn->resourceId]); echo sprintf("Client Disconnected: %s" . PHP_EOL, $conn->remoteAddress); }