Wil*_*rys 46 amqp rabbitmq celery celeryd pika
似乎我让我的Rabbitmq服务器运行的时间越长,我对未确认消息的麻烦就越多.我很乐意将它们重新排列.实际上似乎有一个amqp命令来执行此操作,但它仅适用于您的连接使用的通道.我制作了一个小的鼠兔脚本,至少尝试一下,但是我要么缺少一些东西,要么就是这样做了(用rabbitmqctl怎么样?)
import pika
credentials = pika.PlainCredentials('***', '***')
parameters = pika.ConnectionParameters(host='localhost',port=5672,\
credentials=credentials, virtual_host='***')
def handle_delivery(body):
"""Called when we receive a message from RabbitMQ"""
print body
def on_connected(connection):
"""Called when we are fully connected to RabbitMQ"""
connection.channel(on_channel_open)
def on_channel_open(new_channel):
"""Called when our channel has opened"""
global channel
channel = new_channel
channel.basic_recover(callback=handle_delivery,requeue=True)
try:
connection = pika.SelectConnection(parameters=parameters,\
on_open_callback=on_connected)
# Loop so we can communicate with RabbitMQ
connection.ioloop.start()
except KeyboardInterrupt:
# Gracefully close the connection
connection.close()
# Loop until we're fully closed, will stop on its own
connection.ioloop.start()
Run Code Online (Sandbox Code Playgroud)
Bri*_*lly 67
未确认的消息是那些已通过网络传递给消费者但尚未被拒绝或被拒绝的消息 - 但该消费者尚未关闭其最初接收它们的频道或连接.因此,经纪人无法弄清楚消费者是否只是花了很长时间来处理这些消息,或者是否忘记了这些消息.因此,它使他们处于一种未被承认的状态,直到消费者死亡或他们被拒绝或被拒绝.
由于这些消息在将来仍然可以由最初使用它们的仍然存活的消费者进行有效处理,因此您不能(据我所知)将另一个消费者插入混合并尝试做出关于它们的外部决策.您需要修复您的消费者,以便在处理每条消息时做出决策,而不是将旧消息保留为未确认消息.
Iva*_*anD 21
如果消息未被打包,则只有两种方法可以让它们重新进入队列:
basic.nack
此命令将使消息放回队列并重新传递.
断开与经纪人的联系
此操作将强制将此通道中的所有未分组消息放回队列中.
注意:basic.recover将尝试在同一个通道(同一个消费者)上重新发布未经处理的消息,这有时是所需的行为.
RabbitMQ规范为basic.recover和basic.nack
真正的问题是:为什么消息不被确认?
导致未打包消息的可能方案:
消费者获取太多消息,然后没有足够快地处理和执行它们.
解决方案:根据需要预取少量消息.
Buggy客户端库(我目前有这个问题与pika 0.9.13.如果队列有很多消息,一定数量的消息将被解锁,甚至几小时后.
解决方案:我必须多次重启消费者,直到所有未经消息的消息从队列中消失.
一旦所有工人/消费者停止,所有未确认的消息将进入就绪状态。
确保所有员工都通过了确认停止grep对ps aux输出和停止/杀死他们,如果发现。
如果您使用 supervisor 管理工人,显示工人已停止,您可能需要检查僵尸。Supervisor 报告工作程序已停止,但当 grepped ps aux 输出时,您仍然会发现僵尸进程正在运行。杀死僵尸进程将使消息回到就绪状态。