当prefetch_count == 1时拒绝并重新执行RabbitMQ任务

smi*_*lli 3 python rabbitmq pika

假设我有一个包含五个项目的队列:

(tail) E, D, C, B, A (head)
Run Code Online (Sandbox Code Playgroud)

我从此队列的开头消耗了消息,但决定该消息A当前不适合处理。我reject将该项目添加到requeue=True,并且队列变为:

(tail) A, E, D, C, B (head)
Run Code Online (Sandbox Code Playgroud)

我再消费BCD,和Eack荷兰国际集团各一个。现在,该队列仅容纳A,而我reject在一个永无休止的循环中不断不断地消耗它。如果有新的非A消息进入,则几乎立即将其消耗掉,然后该过程将恢复尝试消耗的循环A

我对Pika文档中的Twisted Consumer示例进行了一些修改:

import pika
from pika import exceptions
from pika.adapters import twisted_connection
from twisted.internet import defer, reactor, protocol,task


@defer.inlineCallbacks
def run(connection):

    channel = yield connection.channel()

    exchange = yield channel.exchange_declare(exchange='topic_link',type='topic')

    queue = yield channel.queue_declare(queue='hello', auto_delete=False, exclusive=False)

    yield channel.queue_bind(exchange='topic_link',queue='hello',routing_key='hello.world')

    #yield channel.basic_qos(prefetch_count=1)

    queue_object, consumer_tag = yield channel.basic_consume(queue='hello',no_ack=False)

    l = task.LoopingCall(read, queue_object)

    l.start(0.01)


@defer.inlineCallbacks
def read(queue_object):

    ch,method,properties,body = yield queue_object.get()

    print body

    if body == 'A':
        yield ch.basic_reject(delivery_tag=method.delivery_tag, requeue=True)
    else:
        yield ch.basic_ack(delivery_tag=method.delivery_tag)


parameters = pika.ConnectionParameters()
cc = protocol.ClientCreator(reactor, twisted_connection.TwistedProtocolConnection, parameters)
d = cc.connectTCP('hostname', 5672)
d.addCallback(lambda protocol: protocol.ready)
d.addCallback(run)
reactor.run()
Run Code Online (Sandbox Code Playgroud)

问题:请注意以下注释行:

#yield channel.basic_qos(prefetch_count=1)
Run Code Online (Sandbox Code Playgroud)

当我取消注释时,消费者到达message时A,它将在reject读取后立即再次将其拾取,而忽略了它后面的队列中可能正在等待的其他任何项目。而不是将被拒绝的项目放在队列的尾部,它只是不断地反复尝试,完全阻塞了队列中的所有其他对象。

注释掉该行后,它可以正常运行(尽管速度稍慢)。如果该行存在和prefetch_count > 1,那么它也有效。将其设置为完全1触发此行为的某些事情。

我在拒绝消息时缺少步骤A吗?还是Pika的预取系统与这种边缘情况根本不兼容?

pin*_*ain 5

如果您只有一个使用者,那么RabbitMQ除了向被拒绝的那个使用者发送消息外别无其他方法(无论如何:使用basic.reject或basic.nack)。

设置prefetch_count > 1后,您的使用者将看到已循环的消息以及循环开头的新消息(从字面上看,您循环的消息将停留在最前面)。

If you get accidentally N*M looped messages withprefetch_count <= N and consumers number <= M you will have all messages being looped (which leads to burned down CPU and so on), so it might be a good catch to check rejected message flag and have some advanced logic if message was already redelivered.