vgo*_*ani 0 python multithreading rabbitmq kombu pika
我正在使用kombu通过生产者/消费者模型管理RabbitMQ.我启动了我的生产者,它在队列中放置了100个作业(我只有一个队列和一个交换).我想同时启动多个消费者,让每个消费者一次处理一个工作.不幸的是,消费者互相阻挠(即,当一个消费者从队列中抓取一份工作时,其他消费者只是闲置着).如果我杀死了工作消费者,那么其他一个消费者就会开始工作并开始工作.有没有办法让所有消费者同时运行,每个消费者从队列中处理不同的工作?我的消费者代码如下:
def start_consumer(self, incoming_exchange_name):
if self.rabbitmq_connection.connected:
callbacks=[]
queues=[]
callbacks.append(self._callback)
queues.append(self.incoming_queue)
print 'opening a new *incoming* rabbitmq connection to the %s exchange for the %s queue' % (self.incoming_exchange.name, self.incoming_queue.name)
self.incoming_exchange(settings.rabbitmq_connection).declare()
self.incoming_queue(settings.rabbitmq_connection).declare()
with settings.rabbitmq_connection.Consumer(queues=queues, callbacks=callbacks) as consumer:
while True:
try:
self.rabbitmq_connection.drain_events()
except Exception as e:
print 'Error -> %s' % e.message
Run Code Online (Sandbox Code Playgroud)
您需要将消费者预取设置为1(https://kombu.readthedocs.org/en/latest/reference/kombu.transport.pyamqp.html#kombu.transport.pyamqp.Connection.Channel.basic_qos),每个方式消费者将只获取1条消息,并将其余消息留在队列中并准备好状态,因此如果您有2个消费者,其中QOS设置为1并且您有100条消息,则您将处理2个同时执行的任务.
我已将缺少的部分添加到您的代码中,以设置预取计数
def start_consumer(self, incoming_exchange_name):
if self.rabbitmq_connection.connected:
callbacks=[]
queues=[]
callbacks.append(self._callback)
queues.append(self.incoming_queue)
print 'opening a new *incoming* rabbitmq connection to the %s exchange for the %s queue' % (self.incoming_exchange.name, self.incoming_queue.name)
self.incoming_exchange(settings.rabbitmq_connection).declare()
self.incoming_queue(settings.rabbitmq_connection).declare()
channel = self.rabbitmq_connection.channel()
channel.basic_qos(prefetch_size=0, prefetch_count=1, a_global=False)
with settings.rabbitmq_connection.Consumer(queues=queues, callbacks=callbacks, channel=channel) as consumer:
while True:
try:
self.rabbitmq_connection.drain_events()
except Exception as e:
print 'Error -> %s' % e.message
Run Code Online (Sandbox Code Playgroud)