Lee*_* He 5 python rabbitmq pika
在发布消息之前,代码会在进行实际发布之前检查 RabbitMQ 连接是否未关闭(请参阅下面的调用代码)。然而,当这个检查完成时,似乎出现了一些奇怪的 TypeError。
更多细节:
pika.SelectConnection)侦听特定队列。pika.BlockingConnection)将其他消息发布到另一个队列。rmq_consumer_connection.ioloop.start()是一个阻塞操作(因此不能做等效的rmq_producer_connection.ioloop.start())。我可能会考虑在未来进行优化(例如,为消费者和生产者使用单独的线程,这样两者都可以是异步的)。堆栈跟踪如下:
Apr 05 19:25:16 prod python[23326]: job|callback_user_queue|ERROR| Exception in callback_user_queue(..)
Apr 05 19:25:16 prod python[23326]: Traceback (most recent call last):
Apr 05 19:25:16 prod python[23326]: File "/opt/app/gateway_messenger.py", line 282, in send_to_gateway
Apr 05 19:25:16 prod python[23326]: gv.rmq_producer_connection.process_data_events()
Apr 05 19:25:16 prod python[23326]: File "/home/prod/anaconda3/envs/venv/lib/python3.5/site-packages/pika/adapters/blocking_connection.py", line 751, in process_data_events
Apr 05 19:25:16 prod python[23326]: with _IoloopTimerContext(time_limit, self._impl) as timer:
Apr 05 19:25:16 prod python[23326]: File "/home/prod/anaconda3/envs/venv/lib/python3.5/site-packages/pika/adapters/blocking_connection.py", line 188, in __enter__
Apr 05 19:25:16 prod python[23326]: self._callback_result.signal_once)
Apr 05 19:25:16 prod python[23326]: File "/home/prod/anaconda3/envs/venv/lib/python3.5/site-packages/pika/adapters/base_connection.py", line 112, in add_timeout
Apr 05 19:25:16 prod python[23326]: return self.ioloop.add_timeout(deadline, callback_method)
Apr 05 19:25:16 prod python[23326]: File "/home/prod/anaconda3/envs/venv/lib/python3.5/site-packages/pika/adapters/select_connection.py", line 364, in add_timeout
Apr 05 19:25:16 prod python[23326]: return self._timer.call_later(deadline, callback_method)
Apr 05 19:25:16 prod python[23326]: File "/home/prod/anaconda3/envs/venv/lib/python3.5/site-packages/pika/adapters/select_connection.py", line 199, in call_later
Apr 05 19:25:16 prod python[23326]: heapq.heappush(self._timeout_heap, timeout)
Apr 05 19:25:16 prod python[23326]: TypeError: heap argument must be a list
Run Code Online (Sandbox Code Playgroud)
调用代码如下:
def send_to_gateway(uid, data):
...(non-relevant code)...
try:
gv.rmq_producer_connection.process_data_events() <-- fails here
except pika.exceptions.ConnectionClosed:
logger.warn("RMQ producer connection is closed. Re-opening...")
initialise_producer()
...(non-relevant code)...
def initialise_producer():
logger.info("Initialising RabbitMQ producer...")
gv.rmq_producer_connection = pika.BlockingConnection(
pika.ConnectionParameters(gv.config['rmq_ip']))
gv.rmq_producer_channel = gv.rmq_producer_connection.channel()
gv.rmq_producer_channel.queue_declare(queue=gv.config['rmq_queue'],
durable=True)
logger.info("Initialising RabbitMQ producer done!")
Run Code Online (Sandbox Code Playgroud)
预期结果:
| 归档时间: |
|
| 查看次数: |
761 次 |
| 最近记录: |