在python中使用rabbitmq时出现"pika.exceptions.ConnectionClosed"错误

sau*_*sau 4 python rabbitmq pika

我正在使用"hello world"教程:http://www.rabbitmq.com/tutorials/tutorial-two-python.html. worker.py看起来像这样

import pika
import time


connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)
print ' [*] Waiting for messages. To exit press CTRL+C'

def callback(ch, method, properties, body):
    print " [x] Received %r" % (body,)
    time.sleep( body.count('.') )
    print " [x] Done"
    ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,
                      queue='task_queue')

channel.start_consuming()
Run Code Online (Sandbox Code Playgroud)

我已经在我的工作中使用了这段代码.一切都顺利进行,直到队列中出现一个点,打印后会引发异常[x] Done

Traceback (most recent call last):
  File "hullworker2.py", line 242, in <module>
    channel.basic_consume(callback,queue='test_queue2')
  File "/usr/local/lib/python2.7/dist-packages/pika/channel.py", line 211, in basic_consume
    {'consumer_tag': consumer_tag})])
  File "/usr/local/lib/python2.7/dist-packages/pika/adapters/blocking_connection.py", line 904, in _rpc
    self.connection.process_data_events()
  File "/usr/local/lib/python2.7/dist-packages/pika/adapters/blocking_connection.py", line 88, in process_data_events
    if self._handle_read():
  File "/usr/local/lib/python2.7/dist-packages/pika/adapters/blocking_connection.py", line 184, in _handle_read
    super(BlockingConnection, self)._handle_read()
  File "/usr/local/lib/python2.7/dist-packages/pika/adapters/base_connection.py", line 300, in _handle_read
    return self._handle_error(error)
  File "/usr/local/lib/python2.7/dist-packages/pika/adapters/base_connection.py", line 264, in _handle_error
    self._handle_disconnect()
  File "/usr/local/lib/python2.7/dist-packages/pika/adapters/blocking_connection.py", line 181, in _handle_disconnect
    self._on_connection_closed(None, True)
  File "/usr/local/lib/python2.7/dist-packages/pika/adapters/blocking_connection.py", line 232, in _on_connection_closed
    self._channels[channel]._on_close(method_frame)
  File "/usr/local/lib/python2.7/dist-packages/pika/adapters/blocking_connection.py", line 817, in _on_close
    self._send_method(spec.Channel.CloseOk(), None, False)
  File "/usr/local/lib/python2.7/dist-packages/pika/adapters/blocking_connection.py", line 920, in _send_method
    self.connection.send_method(self.channel_number, method_frame, content)
  File "/usr/local/lib/python2.7/dist-packages/pika/adapters/blocking_connection.py", line 120, in send_method
    self._send_method(channel_number, method_frame, content)
  File "/usr/local/lib/python2.7/dist-packages/pika/connection.py", line 1331, in _send_method
    self._send_frame(frame.Method(channel_number, method_frame))
  File "/usr/local/lib/python2.7/dist-packages/pika/adapters/blocking_connection.py", line 245, in _send_frame
    super(BlockingConnection, self)._send_frame(frame_value)
  File "/usr/local/lib/python2.7/dist-packages/pika/connection.py", line 1312, in _send_frame
    raise exceptions.ConnectionClosed
pika.exceptions.ConnectionClosed
Run Code Online (Sandbox Code Playgroud)

我不明白连接如何在进程之间自动关闭.进程在队列中的100条消息中运行正常,然后突然出现此错误.任何帮助赞赏.

Vor*_*Vor 8

有一个概念heartbeats.它基本上是服务器如何确保客户端仍然连接的方式.

当你这样做

time.sleep( body.count('.') )
Run Code Online (Sandbox Code Playgroud)

您可以按N秒数阻止代码.这意味着如果服务器想要发送一个heartbeat框架来检查你的客户端是否还活着,那么它就不会得到回复,因为你的代码被阻止了,并且不知道心跳是否到来.

time.sleep()应该使用connection.sleep()此代码而不是使用它也会使代码"休眠" N几秒钟,但它也将与服务器通信并将响应.

 sleep(duration)[source]

    A safer way to sleep than calling time.sleep() directly which will keep the adapter from ignoring frames sent from RabbitMQ. The connection will “sleep” or block the number of seconds specified in duration in small intervals.
Run Code Online (Sandbox Code Playgroud)

  • 虽然这个答案解决了问题中的问题.我认为它没有解决长期任务和听力问题的问题. (2认同)