RabbitMQ在处理长时间运行的任务时关闭连接,超时设置产生错误

Gre*_*reg 11 python amqp rabbitmq pika python-pika

我正在使用RabbitMQ生产者向消费者发送长时间运行的任务(30分钟+).问题是,当关闭服务器的连接并且未确认的任务被重新排队时,消费者仍在处理任务.

从研究中我了解到心跳增加的连接超时 可以用来解决这个问题.这两种解决方案在尝试时都会引发错误.在阅读类似帖子的答案时,我还了解到,自发布答案后,RabbitMQ已经实施了许多更改(例如,默认心跳超时已从RabbitMQ 3.5.5之前的580更改为60).

指定心跳和阻止的连接超时时:

credentials = pika.PlainCredentials('user', 'password')
parameters = pika.ConnectionParameters('XXX.XXX.XXX.XXX', port, '/', credentials, blocked_connection_timeout=2000)
connection = pika.BlockingConnection(parameters)

channel = connection.channel()
Run Code Online (Sandbox Code Playgroud)

显示以下错误:

TypeError: __init__() got an unexpected keyword argument 'blocked_connection_timeout'
Run Code Online (Sandbox Code Playgroud)

heartbeat_interval=1000在连接参数中指定时,会显示类似的错误:TypeError: __init__() got an unexpected keyword argument 'heartbeat_interval'

同样,socket_timeout = 1000显示以下错误:TypeError: __init__() got an unexpected keyword argument 'socket_timeout'

我在Ubuntu 14.04上运行RabbitMQ 3.6.1,pika 0.10.0和python 2.7.

  1. 为什么上述方法会产生错误?
  2. 在有长期连续任务的情况下,是否可以使用心跳方法?例如,在执行需要30分钟以上的大型数据库连接时,是否可以使用心跳?我赞成心跳方法,很多时候很难判断数据库连接等任务需要多长时间.

我已经阅读了类似问题的答案

更新:从pika文档中运行代码会产生相同的错误.

Der*_*ley 7

我遇到了与您的系统相同的问题,在很长的任务中连接断开.

如果您的网络设置使得强制丢弃空闲TCP/IP连接,则心跳可能有助于保持连接活动.但是,如果情况并非如此,改变心跳将无济于事.

更改连接超时将无济于事.此设置仅在最初创建连接时使用.

我正在使用RabbitMQ生产者向消费者发送长时间运行的任务(30分钟+).问题是,当关闭服务器的连接并且未确认的任务被重新排队时,消费者仍在处理任务.

这有两个原因,你已经遇到过这两个原因:

  1. 即使在最好的情况下,连接也会随机丢失
  2. 由于重新排队的消息而重新启动进程可能会导致问题

部署RabbitMQ代码的任务范围从不到一秒到几个小时,我发现立即确认消息并使用状态消息更新系统最适合很长的任务,比如这样.

您需要有一个记录系统(可能带有数据库),以跟踪给定作业的状态.

当消费者接收消息并开始该过程时,它应立即确认该消息并向记录系统发送"已启动"状态消息.

当过程完成时,发送另一条消息说它已完成.

这不会解决掉线连接问题,但无论如何都无法100%解决.相反,它将阻止在断开连接时发生消息重新排队问题.

但是,这个解决方案确实引入了另一个问题:当长时间运行的进程崩溃时,你如何恢复工作?

基本答案是使用作业的记录系统(您的数据库)状态告诉您需要再次接受该工作.当应用程序启动时,请检查数据库以查看是否有未完成的工作.如果以适当的方式存在,恢复或重新启动该工作.