在pika/RabbitMQ中处理长时间运行的任务

jma*_*agh 49 rabbitmq pika

我们正在尝试建立一个基本的有向队列系统,其中生产者将生成多个任务,一个或多个消费者将一次获取任务,处理它并确认该消息.

问题是,处理可能需要10-20分钟,而我们当时没有响应消息,导致服务器断开连接.

这是我们的消费者的一些伪代码:

#!/usr/bin/env python
import pika
import time

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)
print ' [*] Waiting for messages. To exit press CTRL+C'

def callback(ch, method, properties, body):
    long_running_task(connection)
    ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,
                      queue='task_queue')

channel.start_consuming()
Run Code Online (Sandbox Code Playgroud)

第一个任务完成后,在BlockingConnection内部的某处抛出异常,抱怨套接字已重置.此外,RabbitMQ日志显示消费者因未及时响应而断开连接(为什么重置连接而不是发送FIN很奇怪,但我们不会担心这一点).

我们搜索了很多,因为我们认为这是RabbitMQ的正常使用案例(有许多长期运行的任务应该在许多消费者中分开),但似乎没有其他人真正有这个问题.最后,我们偶然发现了一个线程,建议使用心跳并long_running_task()在单独的线程中生成心跳.

所以代码变成了:

#!/usr/bin/env python
import pika
import time
import threading

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost',
        heartbeat_interval=20))
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)
print ' [*] Waiting for messages. To exit press CTRL+C'

def thread_func(ch, method, body):
    long_running_task(connection)
    ch.basic_ack(delivery_tag = method.delivery_tag)

def callback(ch, method, properties, body):
    threading.Thread(target=thread_func, args=(ch, method, body)).start()

channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,
                      queue='task_queue')

channel.start_consuming()
Run Code Online (Sandbox Code Playgroud)

这似乎有效,但它非常混乱.我们确定该ch对象是线程安全的吗?另外,假设long_running_task()正在使用该连接参数将任务添加到新队列(即,完成此长流程的第一部分,让我们将任务发送到第二部分).所以,线程正在使用该connection对象.这个线程安全吗?

更重要的是,这样做的首选方式是什么?我觉得这很麻烦,可能不是线程安全的,所以也许我们做得不对.谢谢!

Gav*_*Roy 23

现在,你最好的办法是关闭心跳,如果你长时间阻塞,这将使RabbitMQ无法关闭连接.我正在尝试在后台线程中运行pika的核心连接管理和IO循环,但它不够稳定,不能发布.

  • Pika`0.12.0`有更好的解决方案,请参阅[此答案](/sf/answers/3706635341/) (4认同)

Mr.*_*. C 10

我遇到了同样的问题.
我的解决方案是:

  1. 在服务器端关闭心跳
  2. 评估任务可以采取的最长时间
  3. 将客户端心跳超时设置为从步骤2获取的时间

为什么这个?

正如我测试以下情况:

案例一
  1. 服务器心跳开启,19世纪
  2. 客户未设置

当任务运行很长时间 - > 1800时,我仍然会收到错误

案例二
  1. 关闭服务器心跳
  2. 关闭客户端心跳

客户端没有错误,除了一个问题 - 当客户端崩溃时(我的操作系统重新启动了一些故障),仍然可以在Rabbitmq Management插件中看到tcp连接.而且令人困惑.

案例三
  1. 关闭服务器心跳
  2. 打开客户端心跳,将其设置为预见的最长运行时间

在这种情况下,我可以动态地改变不同客户的每一次热潮.事实上,我经常在经常崩溃的机器上设置心跳.此外,我可以通过Rabbitmq Manangement插件看到离线机器.

环境

操作系统:centos x86_64
pika:0.9.13
rabbitmq:3.3.1

  • 您可以尝试这样的事情:`params = pika.ConnectionParameters(host = self .__ host,port = self .__ port,credentials = credentials,heartbeat_interval = <your-interval-in-seconds>) (3认同)

小智 8

  1. 您可以connection.process_data_events()在您的 中定期调用long_running_task(connection),该函数在被调用时会向服务器发送心跳,并使 pika 客户端远离关闭。
  2. connection.process_data_events()在你的 pika 中设置心跳值大于 call period BlockingConnection


Luk*_*ken 8

请不要禁用心跳!

从Pika开始0.12.0,请使用此示例代码中描述的技术在单独的线程上运行长时间运行的任务,然后确认来自该线程的消息.


注:在RabbitMQ的团队监控rabbitmq-users邮件列表,只有时StackOverflow上回答问题.


Abd*_*eem 6

不要禁用心跳。
最好的解决方案是在单独的线程中运行任务,并将设置为prefetch_count1以便使用者仅使用这样的内容获得1条未确认的消息channel.basic_qos(prefetch_count=1)