/** * User subscribes with their subject:userId:token * * If token does not match userId (check DB), return a 403 else... * * Token is regenerated and placed in DB for next time the page is visited, * userId is retrieved that matches token. * * New process spawned which runs looping zmq with argv being userId from * previous DB check. Process Id retrieved. * * Add conn, subject, userId, processId, ip and new token to $clients array * * Print above details to console for debugging (or log all this) * * @param \Ratchet\ConnectionInterface $conn * @param type $topic */ public function onSubscribe(ConnectionInterface $conn, $subscription) { list($subject, $userId, $token) = explode(':', $subscription->getId()); $subscription->add($conn); $resourceId = $conn->resourceId; $pidArr = []; $cmd = sprintf('%s%s --userId=%d --subject=%s --resourceId=%s', self::CMD_PREFIX, self::CMD_TORRENTS, $userId, $subject, $resourceId); exec(sprintf("%s > /dev/null 2>&1 & echo $!;", $cmd), $pidArr); $pid = $pidArr[0]; if (empty($pidArr) || $pid == 0) { echo sprintf("Couldn't create a new process for user: %d", $userId); return; } if (!array_key_exists($resourceId, $this->clients)) { $this->clients[$resourceId]['userId'] = $userId; $this->clients[$resourceId]['token'] = $token; $this->clients[$resourceId]['conn'] = $conn; $this->clients[$resourceId]['subject'] = $subject; $this->clients[$resourceId]['subscription'] = $subscription; $this->clients[$resourceId]['pid'] = $pid; } echo sprintf("Client Connected: %s, userId: %d, subject: %s" . PHP_EOL, $conn->remoteAddress, $userId, $subject); } /** * @param string JSON'ified string we'll receive from ZeroMQ */ public function onMessage($data) { $data = json_decode($data, true); var_dump($data); $result = reset(array_filter($this->clients, function($client) use ($data) { return $client['conn']->resourceId == $data['resourceId']; })); if ($result) { $result['subscription']->broadcast($data); } else { echo "Couldn't find user in this->clients"; return; } } public function onClose(ConnectionInterface $conn) { $resourceId = $conn->resourceId; $result = reset(array_filter($this->clients, function($client) use ($resourceId) { return $client['conn']->resourceId == $resourceId; })); posix_kill($result['pid'], SIGKILL); unset($this->clients[$conn->resourceId]); echo sprintf("Client Disconnected: %s" . PHP_EOL, $conn->remoteAddress); }