当我使用pika(python)向RabbitMQ尝试ack消息时出现错误"未知传递标记"

sol*_*117 20 python message-queue rabbitmq pika

我想在几个线程中使用进程消息但是在执行此代码时我遇到错误:

from __future__ import with_statement
import pika
import sys
from pika.adapters.blocking_connection import BlockingConnection
from pika import connection, credentials
import time
import threading
import random
from pika.adapters.select_connection import SelectConnection
from pika.connection import Connection
import traceback


def doWork(body, args, channel):


    r = random.random()
    time.sleep(r * 10)
    try:        
        channel.basic_ack(delivery_tag=args.delivery_tag)

    except :
        traceback.print_exc()


auth = credentials.PlainCredentials(username="guest", password="guest")
params = connection.ConnectionParameters(host="localhost", credentials=auth)
conn = BlockingConnection(params)
channel = conn.channel()


while True:

    time.sleep(0.03)    
    try:

        method_frame, header_frame, body = channel.basic_get(queue="test_queue")
        if method_frame.NAME == 'Basic.GetEmpty':
            continue        

        t = threading.Thread(target=doWork, args=[body, method_frame, channel])
        t.setDaemon(True)
        t.start()

    except Exception, e:
        traceback.print_exc()
        continue
Run Code Online (Sandbox Code Playgroud)

错误说明:

Traceback (most recent call last):
  File "C:\work\projects\mq\start.py", line 43, in 
    method_frame, header_frame, body = channel.basic_get(queue="test_queue")
  File "C:\work\projects\mq\libs\pika\adapters\blocking_connection.py", line 318, in basic_get
    self.basic_get_(self, self._on_basic_get, ticket, queue, no_ack)
  File "C:\work\projects\mq\libs\pika\channel.py", line 469, in basic_get
    no_ack=no_ack))
  File "C:\work\projects\mq\libs\pika\adapters\blocking_connection.py", line 244, in send_method
    self.connection.process_data_events()
  File "C:\work\projects\mq\libs\pika\adapters\blocking_connection.py", line 94, in process_data_events
    self._handle_read()
  File "C:\work\projects\mq\libs\pika\adapters\base_connection.py", line 162, in _handle_read
    self._on_data_available(data)
  File "C:\work\projects\mq\libs\pika\connection.py", line 589, in _on_data_available
    frame)                 # Args
  File "C:\work\projects\mq\libs\pika\callback.py", line 124, in process
    callback(*args, **keywords)
  File "C:\work\projects\mq\libs\pika\adapters\blocking_connection.py", line 269, in _on_remote_close
    frame.method.reply_text)
AMQPChannelError: (406, 'PRECONDITION_FAILED - unknown delivery tag 204')

版本:pika 0.9.5,rabbitMQ 2.6.1

Mai*_*ori 41

问题可能是你的设置no_ack=True如下:

consumer_tag = channel.basic_consume(
    message_delivery_event,
    no_ack=True,
    queue=queue,
)
Run Code Online (Sandbox Code Playgroud)

然后确认消息:

channel.basic_ack(delivery_tag=args.delivery_tag)
Run Code Online (Sandbox Code Playgroud)

如果要确认或不确认,则必须选择并设置正确的消耗参数.


Dav*_*ith 9

对我来说,只是我告诉队列我不打算,然后我说.

:

channel.basic_consume(callback, queue=queue_name, no_ack=True)
Run Code Online (Sandbox Code Playgroud)

然后在我的回调中:

def callback(ch, method, properties, body):
  # do stuff
  ch.basic_ack(delivery_tag = method.delivery_tag)
Run Code Online (Sandbox Code Playgroud)

:

channel.basic_consume(callback, queue=queue_name, no_ack=False)
Run Code Online (Sandbox Code Playgroud)

底线:如果您想手动确认,请设置no_ack = False.

来自文档:

no_ack:(bool)如果设置为True,将使用自动确认模式(请参阅http://www.rabbitmq.com/confirms.html)


小智 2

我没有修复方法,但我可以使用 BlockingConnection 适配器验证它是否发生。

当确认或拒绝响应channel.basic_recover()而重新传送的消息时,它始终会发生

鼠兔 0.9.5、rabbitMQ 2.2.0、python 2.7 和 Erlang R14B01

我采取的解决方法是始终指定 Deliver_tag=0

我怀疑只有当您正在确认/确认的消息是您读过的最后一条消息(在流中)时,这才有效。我正在编写的库以一种可以独立确认每个消息的方式抽象消息,这打破了这个解决方案。

谁能确认这个问题是否已得到修复或得到 Pika 团队中任何人的认可?或者,这可能是 RabbitMQ 的问题吗?