Phi*_*ord 12 php queue messaging rabbitmq symfony
好的,这是对正在发生的事情的概述:
M <-- Message with unique id of 1234
|
+-Start Queue
|
|
| <-- Exchange
/|\
/ | \
/ | \ <-- bind to multiple queues
Q1 Q2 Q3
\ | / <-- start of the problem is here
\ | /
\ | /
\|/
|
Q4 <-- Queues 1,2 and 3 must finish first before Queue 4 can start
|
C <-- Consumer
Run Code Online (Sandbox Code Playgroud)
所以我有一个推送到多个队列的交换,每个队列都有一个任务,一旦所有任务完成,只有队列4才能启动.
因此,具有唯一ID 1234的消息被发送到交换机,交换机将其路由到所有任务队列(Q1,Q2,Q3等...),当消息ID 1234的所有任务都已完成时,运行Q4以获取消息id 1234.
我该如何实现呢?
使用Symfony2,RabbitMQBundle和RabbitMQ 3.x
资源:
更新#1
好的我觉得这就是我要找的东西:
RPC with Parallel Processing,但是如何将Correlation Id设置为我的唯一ID来对消息进行分组并确定哪个队列?
你需要实现这个:http://www.eaipatterns.com/Aggregator.html但是针对Symfony的RabbitMQBundle不支持,所以你必须使用底层的php-amqplib.
来自捆绑包的普通消费者回调将获得AMQPMessage.从那里,您可以访问该频道并手动发布到"管道和过滤器"实现中接下来的任何交换
在RabbitMQ 站点的RPC教程中,有一种方法可以将“相关 ID”传递给队列中的用户,该 ID 可以识别您的消息。
我建议使用某种 id 将您的消息放入前 3 个队列中,然后使用另一个进程将消息从 3 个队列中出列到某种类型的存储桶中。当这些存储桶收到我假设完成的 3 个任务时,将最终消息发送到第 4 个队列进行处理。
如果您要为一个用户向每个队列发送 1 个以上的工作项目,您可能需要做一些预处理以找出特定用户放入队列的项目数量,以便在 4 之前出队的进程知道在排队之前有多少向上。
我用 C# 做我的 rabbitmq,很抱歉我的伪代码不是 php 风格
// Client
byte[] body = new byte[size];
body[0] = uniqueUserId;
body[1] = howManyWorkItems;
body[2] = command;
// Setup your body here
Queue(body)
Run Code Online (Sandbox Code Playgroud)
// Server
// Process queue 1, 2, 3
Dequeue(message)
switch(message.body[2])
{
// process however you see fit
}
processedMessages[message.body[0]]++;
if(processedMessages[message.body[0]] == message.body[1])
{
// Send to queue 4
Queue(newMessage)
}
Run Code Online (Sandbox Code Playgroud)
对更新 #1 的回应
与其将客户端视为终端,不如将客户端视为服务器上的进程。所以,如果你喜欢安装在服务器上的RPC客户端这一块,那么所有你需要做的是让服务器处理用户的唯一ID的生成和发送消息到合适的队列:
public function call($uniqueUserId, $workItem) {
$this->response = null;
$this->corr_id = uniqid();
$msg = new AMQPMessage(
serialize(array($uniqueUserId, $workItem)),
array('correlation_id' => $this->corr_id,
'reply_to' => $this->callback_queue)
);
$this->channel->basic_publish($msg, '', 'rpc_queue');
while(!$this->response) {
$this->channel->wait();
}
// We assume that in the response we will get our id back
return deserialize($this->response);
}
$rpc = new Rpc();
// Get unique user information and work items here
// Pass even more information in here, like what queue to use or you could even loop over this to send all the work items to the queues they need.
$response = rpc->call($uniqueUserId, $workItem);
$responseBuckets[array[0]]++;
// Just like above code that sees if a bucket is full or not
Run Code Online (Sandbox Code Playgroud)
归档时间: |
|
查看次数: |
4588 次 |
最近记录: |