小编guh*_*pos的帖子

使用Python,Pika和AMQP设计异步RPC应用程序的最佳模式是什么?

我的应用程序的生产者模块由想要提交要在小型集群上完成的工作的用户运行.它通过RabbitMQ消息代理以JSON格式发送订阅.

我已经尝试了几种策略,到目前为止最好的是以下,但仍然没有完全发挥作用:

每个集群计算机都运行一个使用者模块,该模块将自己订阅到AMQP队列并发出prefetch_count来告诉代理一次可以运行多少任务.

我能够使用Pika AMQP库中的SelectConnection使其工作.消费者和生产者都启动两个通道,一个连接到每个队列.制作者在频道[A]上发送请求并等待频道[B]中的响应,并且消费者等待频道[A]上的请求并在频道[B]上发送响应.但是,似乎当消费者运行计算响应的回调时,它会阻塞,所以每次只有每个消费者执行一个任务.

我到底需要什么:

  1. 消费者[A]将他的任务(每次约5k)订阅到集群
  2. 代理为每个使用者分派N个消息/请求,其中N是它可以处理的并发任务的数量
  3. 当一个任务完成时,消费者用结果回复经纪人/生产者
  4. 生产者收到回复,更新计算状态,最后打印一些报告

限制:

  • 如果另一个用户提交工作,他的所有任务将在前一个用户之后排队(我猜这是从队列系统自动生效的,但我没有想到对线程环境的影响)
  • 任务有一个订单要提交,但他们回复的订单并不重要

UPDATE

我已经进一步研究了一下,我的实际问题似乎是我使用一个简单的函数作为回调到pika的SelectConnection.channel.basic_consume()函数.我的最后一个(未实现的)想法是传递线程函数,而不是常规函数,因此回调不会阻塞,消费者可以继续监听.

python design-patterns amqp rabbitmq pika

9
推荐指数
1
解决办法
5009
查看次数

标签 统计

amqp ×1

design-patterns ×1

pika ×1

python ×1

rabbitmq ×1