在 RabbitMQ pika 阻塞连接上调用 process_data_events() 时出现“TypeError: heap argument must be a list”

Lee*_* He 5 python rabbitmq pika

在发布消息之前,代码会在进行实际发布之前检查 RabbitMQ 连接是否未关闭(请参阅下面的调用代码)。然而,当这个检查完成时,似乎出现了一些奇怪的 TypeError。

更多细节:

  • 异步使用者(即使用选择连接pika.SelectConnection)侦听特定队列。
  • 当消费者处理一条消息时,它将通过非异步生产者(即使用阻塞连接pika.BlockingConnection)将其他消息发布到另一个队列。
  • 消费者和生产者是独立的代码段,但位于相同的代码库中并在同一服务器上的同一进程(和线程)中运行。它们被分开是为了更好的代码解耦。
  • 只有消费者是异步的原因是因为调用rmq_consumer_connection.ioloop.start()是一个阻塞操作(因此不能做等效的rmq_producer_connection.ioloop.start())。我可能会考虑在未来进行优化(例如,为消费者和生产者使用单独的线程,这样两者都可以是异步的)。
  • 使用鼠兔 0.12.0、RabbitMQ 3.7.8、Python 3.5

堆栈跟踪如下:

Apr 05 19:25:16 prod python[23326]: job|callback_user_queue|ERROR| Exception in callback_user_queue(..)
Apr 05 19:25:16 prod python[23326]: Traceback (most recent call last):
Apr 05 19:25:16 prod python[23326]:   File "/opt/app/gateway_messenger.py", line 282, in send_to_gateway
Apr 05 19:25:16 prod python[23326]:     gv.rmq_producer_connection.process_data_events()
Apr 05 19:25:16 prod python[23326]:   File "/home/prod/anaconda3/envs/venv/lib/python3.5/site-packages/pika/adapters/blocking_connection.py", line 751, in process_data_events
Apr 05 19:25:16 prod python[23326]:     with _IoloopTimerContext(time_limit, self._impl) as timer:
Apr 05 19:25:16 prod python[23326]:   File "/home/prod/anaconda3/envs/venv/lib/python3.5/site-packages/pika/adapters/blocking_connection.py", line 188, in __enter__
Apr 05 19:25:16 prod python[23326]:     self._callback_result.signal_once)
Apr 05 19:25:16 prod python[23326]:   File "/home/prod/anaconda3/envs/venv/lib/python3.5/site-packages/pika/adapters/base_connection.py", line 112, in add_timeout
Apr 05 19:25:16 prod python[23326]:     return self.ioloop.add_timeout(deadline, callback_method)
Apr 05 19:25:16 prod python[23326]:   File "/home/prod/anaconda3/envs/venv/lib/python3.5/site-packages/pika/adapters/select_connection.py", line 364, in add_timeout
Apr 05 19:25:16 prod python[23326]:     return self._timer.call_later(deadline, callback_method)
Apr 05 19:25:16 prod python[23326]:   File "/home/prod/anaconda3/envs/venv/lib/python3.5/site-packages/pika/adapters/select_connection.py", line 199, in call_later
Apr 05 19:25:16 prod python[23326]:     heapq.heappush(self._timeout_heap, timeout)
Apr 05 19:25:16 prod python[23326]: TypeError: heap argument must be a list
Run Code Online (Sandbox Code Playgroud)

调用代码如下:

def send_to_gateway(uid, data):
...(non-relevant code)...
    try:
        gv.rmq_producer_connection.process_data_events() <-- fails here
    except pika.exceptions.ConnectionClosed:
        logger.warn("RMQ producer connection is closed. Re-opening...")
        initialise_producer()
...(non-relevant code)...


def initialise_producer():
    logger.info("Initialising RabbitMQ producer...")

    gv.rmq_producer_connection = pika.BlockingConnection(
        pika.ConnectionParameters(gv.config['rmq_ip']))
    gv.rmq_producer_channel = gv.rmq_producer_connection.channel()
    gv.rmq_producer_channel.queue_declare(queue=gv.config['rmq_queue'],
                                          durable=True)
    logger.info("Initialising RabbitMQ producer done!")
Run Code Online (Sandbox Code Playgroud)

预期结果:

  • 检查过程中,如果RabbitMQ连接关闭,生产者会在消息发布前重新初始化。
  • 如果连接未关闭,则继续按正常方式发布消息。