我需要为我的新项目选择一个新的队列代理.
这次我需要一个支持pub/sub的可伸缩队列,并且必须保持消息排序.
我读过亚历克西斯的评论:他写道:
"事实上,我们认为RabbitMQ比Kafka提供更强的订购"
我在rabbitmq docs中阅读了消息订购部分:
"消息可以使用AMQP方法返回队列,这些方法具有重新排队参数(basic.recover,basic.reject和basic.nack),或者由于在保留未确认的消息时关闭了通道...使用2.7.0及更高版本它仍然有可能为个人消费者,观察消息无序如果队列中有多个用户.这是因为谁可能重新排队消息的其他用户的行为.从队列中的消息总是在发布顺序举行的视角. "
如果我需要按订单处理消息,我只能使用带有独占队列的rabbitMQ给每个消费者吗?
RabbitMQ仍然被认为是有序消息排队的一个很好的解决方案吗?
有没有办法计算一个工作被重新排队的时间(通过拒绝或Nak)而无需手动重新计算工作?我需要在'n'时间重试一份工作,然后在'n'时间之后放弃它.
ps:目前我手动重新排队作业(删除旧作业,创建具有确切内容的新作业,如果计数器不存在或者值小于'n',则额外的Counter标题)
我创建了一个简单的发布者和使用者在队列上订阅的消费者basic.consume.
我的消费者在作业无异常运行时确认消息.每当我遇到异常时,我都不会收到消息并提前返回.只有已确认的消息才会从队列中消失,因此工作正常.
现在我希望消费者再次接收失败的消息,但重新生成这些消息的唯一方法是重新启动消费者.
我该如何处理这个用例?
设置代码
$channel = new AMQPChannel($connection);
$exchange = new AMQPExchange($channel);
$exchange->setName('my-exchange');
$exchange->setType('fanout');
$exchange->declare();
$queue = new AMQPQueue($channel);
$queue->setName('my-queue');
$queue->declare();
$queue->bind('my-exchange');
Run Code Online (Sandbox Code Playgroud)
消费者代码
$queue->consume(array($this, 'callback'));
public function callback(AMQPEnvelope $msg)
{
try {
//Do some business logic
} catch (Exception $ex) {
//Log exception
return;
}
return $queue->ack($msg->getDeliveryTag());
}
Run Code Online (Sandbox Code Playgroud)
制片人代码
$exchange->publish('message');
Run Code Online (Sandbox Code Playgroud) 我已经看过博客文章,以及从这个问题回答#2 如何在RabbitMQ中设置一些重试尝试?建议在RabbitMQ消息上跟踪重试次数的一种方法是使用x-redelivered-count标头再次发布它.
这是通过在消息上设置标题来完成的,因为它将转向死信交换,或者通过使用与之前相同的标题和正文制作消息的新副本来完成,但加上增加x-redelivered-count(同时确认旧消息)消息的副本?),如果可以用前者完成,如何在消息被删除之前编辑消息的标题或正文?