Skip to content

Instantly share code, notes, and snippets.

@J7mbo
Created October 4, 2013 09:39
Show Gist options
  • Select an option

  • Save J7mbo/6823472 to your computer and use it in GitHub Desktop.

Select an option

Save J7mbo/6823472 to your computer and use it in GitHub Desktop.

Revisions

  1. J7mbo created this gist Oct 4, 2013.
    85 changes: 85 additions & 0 deletions gistfile1.php
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,85 @@
    /**
    * User subscribes with their subject:userId:token
    *
    * If token does not match userId (check DB), return a 403 else...
    *
    * Token is regenerated and placed in DB for next time the page is visited,
    * userId is retrieved that matches token.
    *
    * New process spawned which runs looping zmq with argv being userId from
    * previous DB check. Process Id retrieved.
    *
    * Add conn, subject, userId, processId, ip and new token to $clients array
    *
    * Print above details to console for debugging (or log all this)
    *
    * @param \Ratchet\ConnectionInterface $conn
    * @param type $topic
    */
    public function onSubscribe(ConnectionInterface $conn, $subscription)
    {
    list($subject, $userId, $token) = explode(':', $subscription->getId());

    $subscription->add($conn);

    $resourceId = $conn->resourceId;

    $pidArr = [];
    $cmd = sprintf('%s%s --userId=%d --subject=%s --resourceId=%s', self::CMD_PREFIX, self::CMD_TORRENTS, $userId, $subject, $resourceId);
    exec(sprintf("%s > /dev/null 2>&1 & echo $!;", $cmd), $pidArr);
    $pid = $pidArr[0];

    if (empty($pidArr) || $pid == 0)
    {
    echo sprintf("Couldn't create a new process for user: %d", $userId);
    return;
    }

    if (!array_key_exists($resourceId, $this->clients))
    {
    $this->clients[$resourceId]['userId'] = $userId;
    $this->clients[$resourceId]['token'] = $token;
    $this->clients[$resourceId]['conn'] = $conn;
    $this->clients[$resourceId]['subject'] = $subject;
    $this->clients[$resourceId]['subscription'] = $subscription;
    $this->clients[$resourceId]['pid'] = $pid;
    }

    echo sprintf("Client Connected: %s, userId: %d, subject: %s" . PHP_EOL, $conn->remoteAddress, $userId, $subject);
    }

    /**
    * @param string JSON'ified string we'll receive from ZeroMQ
    */
    public function onMessage($data)
    {
    $data = json_decode($data, true);

    var_dump($data);

    $result = reset(array_filter($this->clients, function($client) use ($data) {
    return $client['conn']->resourceId == $data['resourceId'];
    }));

    if ($result)
    {
    $result['subscription']->broadcast($data);
    }
    else
    {
    echo "Couldn't find user in this->clients";
    return;
    }
    }

    public function onClose(ConnectionInterface $conn)
    {
    $resourceId = $conn->resourceId;
    $result = reset(array_filter($this->clients, function($client) use ($resourceId) {
    return $client['conn']->resourceId == $resourceId;
    }));

    posix_kill($result['pid'], SIGKILL);
    unset($this->clients[$conn->resourceId]);
    echo sprintf("Client Disconnected: %s" . PHP_EOL, $conn->remoteAddress);
    }