如何在确保每个实体FIFO的同时并行处理消息?

Jam*_*per 5 concurrency activemq-classic message-queue rabbitmq hornetq

假设您的系统中有一个实体,例如“ Person”,并且您想处理修改各种Person实体的事件。重要的是:

  • 同一个人的事件以FIFO顺序处理
  • 多个Person事件流由不同的线程/进程并行处理

我们有一个使用共享数据库和锁来解决此问题的实现。线程竞争为一个人获取锁,然后在获取锁后按顺序处理事件。我们希望移到消息队列中以避免轮询和锁定,我们认为这将减少数据库的负载并简化使用者代码的实现。

我已经对ActiveMQ,RabbitMQ和HornetQ进行了一些研究,但是我看不到实现这一目标的明显方法。

ActiveMQ支持使用者订阅通配符,但是我没有看到一种将每个队列的并发限制为1的方法。如果可以,那么解决方案将很简单:

  • 不知何故告诉代理,以/ queue / person开头的所有队列允许并发1。
  • 发布者使用队列名称中的“人员ID”将事件写入队列。例如:/queue/person.20
  • 消费者使用通配符订阅队列:/ queue / person。
  • 每个消费者将收到针对不同人员队列的消息。如果所有人队列都在使用中,则某些消费者可能会闲置,没关系
  • 处理完一条消息后,使用者将发送一个ACK,告知代理已完成该消息,并允许将用于该Person队列的另一条消息发送给另一个使用者(可能是同一条)

ActiveMQ即将结束:您可以进行通配符订阅并启用“独占使用者”,但是这种结合导致单个使用者接收发送到所有匹配队列的所有消息,从而将所有人员的并发性降低为1。我觉得我缺少明显的东西。

问题:

  • 是否可以通过任何主要的消息队列实现来实现上述方法?我们对选择持开放态度。唯一的要求是它必须在Linux上运行。
  • 有没有其他方法可以解决我不考虑的一般问题?

谢谢!

Jam*_*per 4

看起来 JMSXGroupID 就是我正在寻找的。来自 ActiveMQ 文档:

http://activemq.apache.org/message-groups.html

他们的股票价格示例用例正是我所追求的。我唯一担心的是如果单个消费者死亡会发生什么。希望代理能够检测到这一点并选择另一个消费者与该组 ID 关联。