jam*_*amg 5 amqp rabbitmq node-amqp
我想创建一个消费者来处理来自多个可变数量的源的消息,这些源动态连接或断开连接。
\n\n我需要的是每个消费者优先考虑每个来源的前 N 条消息。然后运行多个消费者来提高速度。
\n\n我一直在阅读工作队列、路由和主题的文档,以及许多其他文档,但没有确定如何实现这一点。我还做了一些测试,但没有运气。
\n\n有人可以指出我该怎么做或在哪里阅读有关它的内容吗?
\n\n- 编辑 -
\n\n队列A-----A3--A2--A1-\xe2\x94\x90
\n\nQueueB-----B3--B2--B1-\xe2\x94\xbc--------消费者
\n\n队列C-----C3--C2--C1-\xe2\x94\x98
\n\n期望的效果是每个消费者获取每个队列的第一条消息。例如:A1、B1、C1、A2、B2、C2、A3、B3、C3 等。如果创建了一个新队列 (QueueD),消费者将以相同的方式开始从该队列接收消息。
\n\n提前致谢
\n我需要的是每个消费者优先考虑每个来源的前 N 条消息。然后运行多个消费者来提高速度。
据我所知,所有消息队列仅在队列本身内提供排序保证(Kafka 不在队列级别提供排序保证,而是在队列内的分区内提供排序保证)。但是,这里您要求序列化多个队列。这在分布式系统环境中是不可能的。
为什么?因为如果这些队列有多个消费者,消息将以循环方式传递给队列的每个连接的消费者。
假设prefetch_count=1有两个连接的消费者,第一组消息传递如下:
现在,在分布式系统中,一切都是异步的,并且事情可能会出错。例如:
如果X确认A1,则A3将被传递给X。但如果Y在X之前确认A2,则A3将被传递给Y。
在分布式系统中,谁先确认不在你的控制范围内。考虑以下场景:
我强烈建议您重新考虑您的要求,并在异步上下文中考虑您的预期保证(否则您不会考虑 MoM,不是吗?)。
PS:可以使用一些消费者端逻辑来实现您所要求的内容(但会降低性能/吞吐量)。
我不建议这样做,但是嘿,这是你的系统,谁会阻止你;)