rva*_*iev 7 php sockets rabbitmq websocket swoole
我正在尝试使用websockets将一些数据从php应用程序发送到用户的浏览器.因此我决定将Swoole与RabbitMQ结合使用.
这是我第一次使用websockets并在阅读了一些关于Socket.IO,Ratchet等的帖子后,我决定停止使用Swoole,因为它是用C编写的,并且可以方便地与php一起使用.
这就是我理解使用websockets启用数据传输的想法:1)在CLI中启动RabbitMQ worker和Swoole服务器2)php应用程序向RabbitMQ发送数据3)RabbitMQ向worker发送带有数据的消息4)Worker接收带有数据的消息+建立与Swoole套接字服务器的套接字连接.5)Swoole服务器向所有连接广播数据
问题是如何将Swoole套接字服务器与RabbitMQ绑定?或者如何让RabbitMQ与Swoole建立连接并向其发送数据?
这是代码:
Swoole服务器(swoole_sever.php)
$server = new \swoole_websocket_server("0.0.0.0", 2345, SWOOLE_BASE);
$server->on('open', function(\Swoole\Websocket\Server $server, $req)
{
echo "connection open: {$req->fd}\n";
});
$server->on('message', function($server, \Swoole\Websocket\Frame $frame)
{
echo "received message: {$frame->data}\n";
$server->push($frame->fd, json_encode(["hello", "world"]));
});
$server->on('close', function($server, $fd)
{
echo "connection close: {$fd}\n";
});
$server->start();
Run Code Online (Sandbox Code Playgroud)
工作者从RabbitMQ接收消息,然后连接到Swoole并通过套接字连接(worker.php)广播消息
$connection = new AMQPStreamConnection('0.0.0.0', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->queue_declare('task_queue', false, true, false, false);
echo ' [*] Waiting for messages. To exit press CTRL+C', "\n";
$callback = function($msg){
echo " [x] Received ", $msg->body, "\n";
sleep(substr_count($msg->body, '.'));
echo " [x] Done", "\n";
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
// Here I'm trying to make connection to Swoole server and sernd data
$cli = new \swoole_http_client('0.0.0.0', 2345);
$cli->on('message', function ($_cli, $frame) {
var_dump($frame);
});
$cli->upgrade('/', function($cli)
{
$cli->push('This is the message to send to Swoole server');
$cli->close();
});
};
$channel->basic_qos(null, 1, null);
$channel->basic_consume('task_queue', '', false, false, false, false, $callback);
while(count($channel->callbacks)) {
$channel->wait();
}
$channel->close();
$connection->close();
Run Code Online (Sandbox Code Playgroud)
将消息发送到RabbitMQ的新任务(new_task.php):
$connection = new AMQPStreamConnection('0.0.0.0', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->queue_declare('task_queue', false, true, false, false);
$data = implode(' ', array_slice($argv, 1));
if(empty($data)) $data = "Hello World!";
$msg = new AMQPMessage($data,
array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT)
);
$channel->basic_publish($msg, '', 'task_queue');
echo " [x] Sent ", $data, "\n";
$channel->close();
$connection->close();
Run Code Online (Sandbox Code Playgroud)
在启动swoole服务器和worker后,我从命令行触发new_task.php:
php new_task.php
Run Code Online (Sandbox Code Playgroud)
在运行RabbitMQ Worker的命令行提示符(worker.php)中,我可以看到消息已传递给worker("[x] Received Hello World!"消息正在出现).
但是在Swoole服务器运行的命令行提示符中什么也没发生.
所以问题是:1)这种方法的想法是否正确?2)我做错了什么?
小智 1
worker.php在收到消息时触发的回调(in)中,您使用的swoole_http_client只是异步的。这似乎导致代码永远不会完全执行,因为回调函数在触发异步代码之前返回。
做同样事情的同步方法将解决这个问题。这是一个简单的例子:
$client = new WebSocketClient('0.0.0.0', 2345);
$client->connect();
$client->send('This is the message to send to Swoole server');
$recv = $client->recv();
print_r($recv);
$client->close();
Run Code Online (Sandbox Code Playgroud)
在github上查看WebSocketClient类和示例用法。
您还可以将其包装在协程中,如下所示:
go(function () {
$client = new WebSocketClient('0.0.0.0', 2345);
$client->connect();
$client->send('This is the message to send to Swoole server');
$recv = $client->recv();
print_r($recv);
$client->close();
});
Run Code Online (Sandbox Code Playgroud)