RabbitMQ pika.exceptions.ConnectionClosed

Mau*_*aud 7 python rabbitmq pika

我尝试使用RabbitMQ发送消息和接收消息.我没有计算机科学背景,我使用的术语不是很准确.

我尝试复制教程文件:提交我的html表单时,我的python脚本(cgi)消息正在提交到队列

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
        channel = connection.channel()
        channel.queue_declare(queue='task_queue', durable=True)
        message = PN
        channel.basic_publish(exchange='',
                              routing_key='task_queue',
                              body=message,
                              properties=pika.BasicProperties(
                                 delivery_mode = 2, # make message persistent
                              ))
        connection.close()
Run Code Online (Sandbox Code Playgroud)

我的接收器正在运行:

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
print(' [*] Waiting for messages. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print(" [x] Received Project %r" % body)
    #ch.basic_ack(delivery_tag = method.delivery_tag) 
    if not (os.path.isfile(js_path)):
        print (' [*] ERROR files missing ')
        #ch.basic_ack(delivery_tag = method.delivery_tag)
        return
    p= subprocess.Popen(run a subprocess here)
    p.wait()

    print (' [*] Temporary Files removed')
    print(" [*] Waiting for messages. To exit press CTRL+C")

channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,queue='task_queue',no_ack=True)
channel.start_consuming()
Run Code Online (Sandbox Code Playgroud)

它管理大部分时间但随机崩溃,出现以下错误:

回溯(最近一次调用最后一次):文件"Receive5.py",139行,在channel.start_consuming()文件"C:\ Python27\lib\site-packages\pika\adapters\blocking_connection.py",第1681行, start_consuming self.connection.process_data_events(time_limit = None)文件"C:\ Python27\lib\site-packages\pika\adapters\blocking_connection.py",第647行,在process_data_events中self._flush_output(common_terminator)文件"C:\ Python27\lib\site-packages\pika\adapters\blocking_connection.py",第426行,在_flush_output中引发exceptions.ConnectionClosed()pika.exceptions.ConnectionClosed

ean*_*son 13

这是因为你保持主线程等待,因为这个pika无法处理传入的消息; 在这种情况下,在子进程完成之前,它无法响应心跳.这会导致RabbitMQ认为客户端已死并强制断开连接.

如果您希望使用心跳(建议使用),则需要定期呼叫connection.process_data_events.这可以通过添加一个循环来完成,该循环检查线程是否完成,并且每30秒左右调用一次,process_data_events直到线程完成.

  • 我会说这很大程度上没有记载.我创建自己的AMQP库以避免这个问题. (3认同)