Cri*_*bie 9 message-queue amqp rabbitmq
我试图以更加非常规的方式使用RabbitMq(尽管此时我可以选择任何其他消息队列实现,如果需要).消费者不是将Rabbit推送消息留给我的消费者,而是连接到队列并获取一批N个消息(在此期间消耗一些消息并且可能拒绝一些消息),然后它跳转到另一个队列,依此类推.这样做是为了冗余.如果某些消费者崩溃,则保证所有消息都被其他消费者消费.
问题是我有多个消费者,我不希望他们在同一个队列中竞争.有没有办法保证队列锁定?如果没有,我是否至少可以确保如果2个消费者连接到同一个队列,他们不会读取相同的消息?交易可能在某种程度上帮助了我,但我听说他们将从RabbitMQ中删除.
其他建筑建议也受到欢迎.
谢谢!
编辑: 正如评论中所指出的那样,我需要如何处理消息.它们只在组中有意义,并且相关消息很可能在队列中聚集在一起.例如,如果我拉了一批100条消息,那么很有可能我会对1-3,4-5,6-10等消息做一些事情.如果我找不到某些消息的组,我将他们重新提交到队列中.WorkQueue不起作用,因为它会将来自同一组的消息传播给不知道如何处理它们的多个工作人员.
您是否看过这本关于企业集成模式的免费在线书籍?
听起来你真的需要一个工作流程,在消息传递给你的工作人员之前你有一个batcher组件.使用RabbitMQ有两种方法可以做到这一点.使用可以为您进行批处理的交换类型(和消息格式),或者有一个队列,以及一个工作人员,可以对批次进行分类并将每个批处理放在自己的队列中.批处理器可能还应该向控制队列发送"批量就绪"消息,以便工作人员可以发现新批处理队列的存在.处理批处理后,工作人员可以删除批处理队列.
如果您可以控制消息格式,则可以通过几种方式隐式地使RabbitMQ进行批处理.通过主题交换,您可以确保每条消息上的路由键的格式为work.batchid.something,然后了解批处理xxyzz存在的工作人员将使用#.xxyzz等#绑定键.消费这些消息.不需要重新发布.
另一种方法是在标头中包含批处理ID并使用较新的标头交换类型.当然,如果您愿意编写少量的Erlang代码,也可以实现自己的自定义交换类型.
我建议检查一下这本书,因为它比大多数人开始的典型工作队列概念更好地概述了消息传递体系结构.