使用amqp从队列中解复用消息以并行流处理?

Lui*_*ñiz 9 java spring amqp rabbitmq reactive-programming

我试图找出是否可以从阻塞场景切换到更具反应性的模式.

我有传入的更新命令到达队列,我需要按顺序处理它们,但只需要处理相同实体的那些命令.本质上,我可以创建任意数量的并行更新事件流,只要没有两个流包含有关同一实体的事件.

我在想主要队列的消费者可能能够利用amqp的路由机制和临时队列,为每个实体id创建临时队列,并将消费者挂钩.一旦订户完成并且当前没有关于所讨论的实体的其他事件在队列中,则可以丢弃该队列.

这种情况是经常使用的吗?有没有更好的方法来实现这一目标?在我们当前的系统中,我们使用基于id的命名锁来防止并发更新.

Zar*_*tra 1

至少有 2 个选项:

每个实体一个队列 ,一个实体队列上有 n 个消费者。

一个队列包含所有实体的消息。消息包含实体的数据。您可以将其拆分为多个队列(一种类型的实体使用一个 AMQP 队列)或使用BlockingQueue实现。

拆分 qmqp 队列中的实体的好处

  • 您可以使用rabbitmq创建一个ha-setup
  • 您可以路由消息
  • 如果有一天有必要的话,您可能可以拥有一个实体队列的多个使用者(可扩展性)
  • 消息可以是持久的,因此可以在应用程序崩溃时恢复

使用内部 BlockingQueue 实现的好处

  • 它更快(显然没有 net-io)
  • 一切都必须在一个 JVM 中发生

无论如何,这确实取决于您想要什么,因为两种方式都有其好处。

更新: 我不确定现在是否明白你的意思,但让我给你一些资源来尝试一些事情。有一些特殊的rabbitmq扩展,也许其中一些可以给你一个想法。看一下备用交换交换到交换绑定。

另外,对于基本测试,我不确定它是否涵盖了所有rabbitmq功能或所有amqp功能,但这有时很有用。请记住,此可视化中的路由键是生产者名称,您还可以在其中找到一些示例。导入和导出您的配置。