相关疑难解决方法(0)

在pika/RabbitMQ中处理长时间运行的任务

我们正在尝试建立一个基本的有向队列系统,其中生产者将生成多个任务,一个或多个消费者将一次获取任务,处理它并确认该消息.

问题是,处理可能需要10-20分钟,而我们当时没有响应消息,导致服务器断开连接.

这是我们的消费者的一些伪代码:

#!/usr/bin/env python
import pika
import time

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)
print ' [*] Waiting for messages. To exit press CTRL+C'

def callback(ch, method, properties, body):
    long_running_task(connection)
    ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,
                      queue='task_queue')

channel.start_consuming()
Run Code Online (Sandbox Code Playgroud)

第一个任务完成后,在BlockingConnection内部的某处抛出异常,抱怨套接字已重置.此外,RabbitMQ日志显示消费者因未及时响应而断开连接(为什么重置连接而不是发送FIN很奇怪,但我们不会担心这一点).

我们搜索了很多,因为我们认为这是RabbitMQ的正常使用案例(有许多长期运行的任务应该在许多消费者中分开),但似乎没有其他人真正有这个问题.最后,我们偶然发现了一个线程,建议使用心跳并long_running_task()在单独的线程中生成心跳.

所以代码变成了:

#!/usr/bin/env python
import pika
import time
import threading

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost',
        heartbeat_interval=20))
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)
print ' [*] Waiting for messages. To exit press CTRL+C'

def thread_func(ch, method, body):
    long_running_task(connection)
    ch.basic_ack(delivery_tag …
Run Code Online (Sandbox Code Playgroud)

rabbitmq pika

49
推荐指数
5
解决办法
2万
查看次数

标签 统计

pika ×1

rabbitmq ×1