如何从其他渠道恢复未确认的AMQP消息,而不是我自己的连接?

Wil*_*rys 46 amqp rabbitmq celery celeryd pika

似乎我让我的Rabbitmq服务器运行的时间越长,我对未确认消息的麻烦就越多.我很乐意将它们重新排列.实际上似乎有一个amqp命令来执行此操作,但它仅适用于您的连接使用的通道.我制作了一个小的鼠兔脚本,至少尝试一下,但是我要么缺少一些东西,要么就是这样做了(用rabbitmqctl怎么样?)

import pika

credentials = pika.PlainCredentials('***', '***')
parameters = pika.ConnectionParameters(host='localhost',port=5672,\
    credentials=credentials, virtual_host='***')

def handle_delivery(body):
    """Called when we receive a message from RabbitMQ"""
    print body

def on_connected(connection):
    """Called when we are fully connected to RabbitMQ"""
    connection.channel(on_channel_open)    

def on_channel_open(new_channel):
    """Called when our channel has opened"""
    global channel
    channel = new_channel
    channel.basic_recover(callback=handle_delivery,requeue=True)    

try:
    connection = pika.SelectConnection(parameters=parameters,\
        on_open_callback=on_connected)    

    # Loop so we can communicate with RabbitMQ
    connection.ioloop.start()
except KeyboardInterrupt:
    # Gracefully close the connection
    connection.close()
    # Loop until we're fully closed, will stop on its own
    connection.ioloop.start()
Run Code Online (Sandbox Code Playgroud)

Bri*_*lly 67

未确认的消息是那些已通过网络传递给消费者但尚未被拒绝或被拒绝的消息 - 但该消费者尚未关闭其最初接收它们的频道或连接.因此,经纪人无法弄清楚消费者是否只是花了很长时间来处理这些消息,或者是否忘记了这些消息.因此,它使他们处于一种未被承认的状态,直到消费者死亡或他们被拒绝或被拒绝.

由于这些消息在将来仍然可以由最初使用它们的仍然存活的消费者进行有效处理,因此您不能(据我所知)将另一个消费者插入混合并尝试做出关于它们的外部决策.您需要修复您的消费者,以便在处理每条消息时做出决策,而不是将旧消息保留为未确认消息.

  • 我同意,我想听听你为什么认为芹菜如此严重破碎.它被广泛使用,这是我第一次听到这个抱怨. (8认同)
  • 这是一个很好的起诉书.如果你不介意我问,芹菜的AMQP实施怎么没有正确执行? (6认同)
  • @我将哀悼您正在使用Celery.芹菜开发人员根本不了解AMQP并且创建了一个严重破坏的实现.您需要做出选择,要么摆脱芹菜并做正确的AMQP,要么停止使用AMQP与芹菜,并使用像Redis这样简单的东西.我选择放弃芹菜并留在AMQP. (4认同)

Iva*_*anD 21

如果消息未被打包,则只有两种方法可以让它们重新进入队列:

  1. basic.nack

    此命令将使消息放回队列并重新传递.

  2. 断开与经纪人的联系

    此操作将强制将此通道中的所有未分组消息放回队列中.

注意:basic.recover将尝试在同一个通道(同一个消费者)上重新发布未经处理的消息,这有时是所需的行为.

RabbitMQ规范为basic.recover和basic.nack


真正的问题是:为什么消息不被确认?

导致未打包消息的可能方案:

  1. 消费者获取太多消息,然后没有足够快地处理和执行它们.

    解决方案:根据需要预取少量消息.

  2. Buggy客户端库(我目前有这个问题与pika 0.9.13.如果队列有很多消息,一定数量的消息将被解锁,甚至几小时后.

    解决方案:我必须多次重启消费者,直到所有未经消息的消息从队列中消失.

  • 最后找到报告问题的地方:https://github.com/pika/pika/issues/286 (3认同)

Ven*_*tra 6

一旦所有工人/消费者停止,所有未确认的消息将进入就绪状态。

确保所有员工都通过了确认停止grepps aux输出和停止/杀死他们,如果发现。

如果您使用 supervisor 管理工人,显示工人已停止,您可能需要检查僵尸。Supervisor 报告工作程序已停止,但当 grepped ps aux 输出时,您仍然会发现僵尸进程正在运行。杀死僵尸进程将使消息回到就绪状态。