我的应用程序的生产者模块由想要提交要在小型集群上完成的工作的用户运行.它通过RabbitMQ消息代理以JSON格式发送订阅.
我已经尝试了几种策略,到目前为止最好的是以下,但仍然没有完全发挥作用:
每个集群计算机都运行一个使用者模块,该模块将自己订阅到AMQP队列并发出prefetch_count来告诉代理一次可以运行多少任务.
我能够使用Pika AMQP库中的SelectConnection使其工作.消费者和生产者都启动两个通道,一个连接到每个队列.制作者在频道[A]上发送请求并等待频道[B]中的响应,并且消费者等待频道[A]上的请求并在频道[B]上发送响应.但是,似乎当消费者运行计算响应的回调时,它会阻塞,所以每次只有每个消费者执行一个任务.
我到底需要什么:
限制:
UPDATE
我已经进一步研究了一下,我的实际问题似乎是我使用一个简单的函数作为回调到pika的SelectConnection.channel.basic_consume()函数.我的最后一个(未实现的)想法是传递线程函数,而不是常规函数,因此回调不会阻塞,消费者可以继续监听.