棘轮PHP WAMP - React/ZeroMQ - 特定用户广播

Jim*_*mbo 18 php zeromq websocket ratchet reactphp

注意:这是一样的这个问题,它利用MessageComponentInterface.我正在使用WampServerInterface,所以这个问题具体涉及那个部分.我需要一个代码示例和解释的答案,因为我可以看到这对将来的其他人有帮助.

试图为个人用户提供循环推送

我正在使用Ratchet和ZeroMQ的WAMP部分,我目前有推送集成教程的工作版本.

我正在尝试执行以下操作:

  • zeromq服务器已启动并正在运行,准备记录订阅者和取消订阅者
  • 用户通过websocket协议在他们的浏览器中连接
  • 一个循环开始将数据发送给特定的用户谁要求它
  • 当用户断开连接时,将停止该用户数据的循环

我有点(1)和(2)工作,但我遇到的问题是第三个:

首先:如何仅向每个特定用户发送数据?广播将它发送给每个人,除非"主题"最终可能是个人用户ID?

其次:我有一个很大的安全问题.如果我发送哪个用户ID想要从客户端订阅,这似乎是我需要的,那么用户可以将变量更改为另一个用户的ID,而是返回他们的数据.

第三:我必须运行一个单独的php脚本,其中包含zeromq的代码以启动实际的循环.我不确定这是最好的方法,我宁愿让它完全在代码库中工作,而不是单独的php文件.这是我需要排序的主要领域.

以下代码显示了我目前拥有的内容.

刚从控制台运行的服务器

我按字面输入php bin/push-server.php来运行它.订阅和取消订阅将输出到此终端以进行调试.

$loop   = React\EventLoop\Factory::create();
$pusher = Pusher;

$context = new React\ZMQ\Context($loop);
$pull = $context->getSocket(ZMQ::SOCKET_PULL);
$pull->bind('tcp://127.0.0.1:5555');
$pull->on('message', array($pusher, 'onMessage'));

$webSock = new React\Socket\Server($loop);
$webSock->listen(8080, '0.0.0.0'); // Binding to 0.0.0.0 means remotes can connect
$webServer = new Ratchet\Server\IoServer(
    new Ratchet\WebSocket\WsServer(
        new Ratchet\Wamp\WampServer(
            $pusher
        )
    ),
    $webSock
);

$loop->run();
Run Code Online (Sandbox Code Playgroud)

通过websockets发送数据的Pusher

我省略了无用的东西,专注于onMessage()onSubscribe()方法.

public function onSubscribe(ConnectionInterface $conn, $topic) 
{
    $subject = $topic->getId();
    $ip = $conn->remoteAddress;

    if (!array_key_exists($subject, $this->subscribedTopics)) 
    {
        $this->subscribedTopics[$subject] = $topic;
    }

    $this->clients[] = $conn->resourceId;

    echo sprintf("New Connection: %s" . PHP_EOL, $conn->remoteAddress);
}

public function onMessage($entry) {
    $entryData = json_decode($entry, true);

    var_dump($entryData);

    if (!array_key_exists($entryData['topic'], $this->subscribedTopics)) {
        return;
    }

    $topic = $this->subscribedTopics[$entryData['topic']];

    // This sends out everything to multiple users, not what I want!!
    // I can't send() to individual connections from here I don't think :S
    $topic->broadcast($entryData);
}
Run Code Online (Sandbox Code Playgroud)

开始在循环中使用上述Pusher代码的脚本

这是我的问题 - 这是一个单独的php文件,希望将来可能会集成到其他代码中,但目前我不确定如何正确使用它.我是否从会话中获取用户的ID?我仍然需要从客户端发送它...

// Thought sessions might work here but they don't work for subscription
session_start();
$userId = $_SESSION['userId'];

$loop   = React\EventLoop\Factory::create();

$context = new ZMQContext();
$socket = $context->getSocket(ZMQ::SOCKET_PUSH, 'my pusher');
$socket->connect("tcp://localhost:5555");

$i = 0;
$loop->addPeriodicTimer(4, function() use ($socket, $loop, $userId, &$i) {

   $entryData = array(
       'topic'     => 'subscriptionTopicHere',
       'userId'    => $userId
    );
    $i++;

    // So it doesn't go on infinitely if run from browser
    if ($i >= 3)
    {
        $loop->stop();
    }

    // Send stuff to the queue
    $socket->send(json_encode($entryData));
});
Run Code Online (Sandbox Code Playgroud)

最后,客户端js订阅

$(document).ready(function() { 

    var conn = new ab.Session(
        'ws://localhost:8080' 
      , function() {            
            conn.subscribe('topicHere', function(topic, data) {
                console.log(topic);
                console.log(data);
            });
        }
      , function() {          
            console.warn('WebSocket connection closed');
        }
      , {                       
            'skipSubprotocolCheck': true
        }
    );
});
Run Code Online (Sandbox Code Playgroud)

结论

以上是有效的,但我真的需要弄清楚以下内容:

  • 如何向个人用户发送个人消息?当他们访问在JS中启动websocket连接的页面时,我是否还应该启动脚本将内容推入PHP中的队列(zeromq)?这就是我目前正在手动做的事情,而且感觉不对.

  • 从JS订阅用户时,从会话中获取用户ID并从客户端发送它是不安全的.这可能是伪造的.请告诉我有一种更简单的方法,如果是这样,怎么样?

Jim*_*mbo 22

注意:我的答案包括对ZeroMQ的引用,因为我不再使用它.但是,如果您需要,我相信您将能够弄清楚如何使用ZeroMQ.

使用JSON

首先,Websocket RFCWAMP Spec声明要订阅的主题必须是字符串.我在这里作弊,但我仍然坚持规范:我正在通过JSON.

{
    "topic": "subject here",
    "userId": "1",
    "token": "dsah9273bui3f92h3r83f82h3"
}
Run Code Online (Sandbox Code Playgroud)

JSON仍然是一个字符串,但它允许我传递更多数据来代替"主题",并且PHP json_decode()在另一端很容易做到.当然,您应该验证您实际上是否收到了JSON,但这取决于您的实现.

那么我在这里经历了什么,为什么?

  • 话题

主题是用户订阅的主题.您可以使用它来决定将哪些数据传回给用户.

  • 用户身份

显然是用户的ID.您必须使用下一部分验证此用户是否存在并且是否允许订阅:

  • 代币

这应该是一个使用随机生成的令牌,在PHP中生成,并传递给JavaScript变量.当我说"一次使用"时,我的意思是每次你重新加载页面(并且,通过扩展,在每个HTTP请求上),你的JavaScript变量应该有一个新的令牌.此令牌应根据用户的ID存储在数据库中.

然后,一旦发出websocket请求,您就将令牌和用户ID与数据库中的那些匹配,以确保用户确实是他们所说的人,并且他们没有弄乱JS变量.

注意:在事件处理程序中,您可以使用$conn->remoteAddress获取连接的IP,因此如果有人试图恶意连接,您可以阻止它们(记录它们或其他东西).

为什么这样做?

它的工作原理是每次新连接通过时,唯一令牌确保没有用户可以访问任何其他人的订阅数据.

服务器

这是我用于运行循环和事件处理程序的内容.我正在创建循环,完成所有装饰器样式对象的创建,并传入我的EventHandler(我很快就会来),循环也在那里.

$loop = Factory::create();

new IoServer(
    new WsServer(
        new WampServer(
            new EventHandler($loop) // This is my class. Pass in the loop!
        )
    ),
    $webSock
);

$loop->run();
Run Code Online (Sandbox Code Playgroud)

事件处理程序

class EventHandler implements WampServerInterface, MessageComponentInterface
{
    /**
     * @var \React\EventLoop\LoopInterface
     */
    private $loop;

    /**
     * @var array List of connected clients
     */
    private $clients;

    /**
     * Pass in the react event loop here
     */
    public function __construct(LoopInterface $loop)
    {
        $this->loop = $loop;
    }

    /**
     * A user connects, we store the connection by the unique resource id
     */
    public function onOpen(ConnectionInterface $conn)
    {
        $this->clients[$conn->resourceId]['conn'] = $conn;
    }

    /**
     * A user subscribes. The JSON is in $subscription->getId()
     */
    public function onSubscribe(ConnectionInterface $conn, $subscription)
    {
        // This is the JSON passed in from your JavaScript
        // Obviously you need to validate it's JSON and expected data etc...
        $data = json_decode(subscription->getId());

        // Validate the users id and token together against the db values

        // Now, let's subscribe this user only
        // 5 = the interval, in seconds
        $timer = $this->loop->addPeriodicTimer(5, function() use ($subscription) {
            $data = "whatever data you want to broadcast";
            return $subscription->broadcast(json_encode($data));
        });

        // Store the timer against that user's connection resource Id
        $this->clients[$conn->resourceId]['timer'] = $timer;
    }

    public function onClose(ConnectionInterface $conn)
    {
        // There might be a connection without a timer
        // So make sure there is one before trying to cancel it!
        if (isset($this->clients[$conn->resourceId]['timer']))
        {
            if ($this->clients[$conn->resourceId]['timer'] instanceof TimerInterface)
            {
                $this->loop->cancelTimer($this->clients[$conn->resourceId]['timer']);
            }
        }

        unset($this->clients[$conn->resourceId]);
    }

    /** Implement all the extra methods the interfaces say that you must use **/
}
Run Code Online (Sandbox Code Playgroud)

基本上就是这样.这里的要点是:

  • 唯一令牌,用户ID和连接ID提供了确保一个用户无法看到其他用户数据所需的唯一组合.
  • 唯一令牌意味着如果同一个用户打开另一个页面并请求订阅,他们将拥有自己的连接ID +令牌组合,因此同一个用户不会在同一页面上拥有两倍的订阅(基本上,每个连接都有它自己的个人数据).

延期

您应该确保所有数据都经过验证,而不是在您对其进行任何操作之前进行黑客攻击.使用Monolog之类的东西记录所有连接尝试,并在发生任何严重事件时设置电子邮件转发(例如服务器停止工作,因为某人是个混蛋并试图破解您的服务器).

结束点

  • 验证一切.我不能强调这一点.您在每个请求中更改的唯一令牌非常重要.
  • 请记住,如果您在每个HTTP请求上重新生成令牌,并且在尝试通过websockets连接之前发出POST请求,则必须在重新连接之前将重新生成的令牌传回JavaScript:否则您的令牌将无效).
  • 记录一切.记录连接的每个人,询问主题和断开连接.Monolog很适合这个.

  • 如果你想在这里添加另一个答案,展示如何使用`Sessions`部分,我很乐意向你推荐,因为它可能对未来的用户有用,也许我也是:-) (4认同)