Gre*_*reg 11 python amqp rabbitmq pika python-pika
我正在使用RabbitMQ生产者向消费者发送长时间运行的任务(30分钟+).问题是,当关闭服务器的连接并且未确认的任务被重新排队时,消费者仍在处理任务.
从研究中我了解到心跳或增加的连接超时 可以用来解决这个问题.这两种解决方案在尝试时都会引发错误.在阅读类似帖子的答案时,我还了解到,自发布答案后,RabbitMQ已经实施了许多更改(例如,默认心跳超时已从RabbitMQ 3.5.5之前的580更改为60).
指定心跳和阻止的连接超时时:
credentials = pika.PlainCredentials('user', 'password')
parameters = pika.ConnectionParameters('XXX.XXX.XXX.XXX', port, '/', credentials, blocked_connection_timeout=2000)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
Run Code Online (Sandbox Code Playgroud)
显示以下错误:
TypeError: __init__() got an unexpected keyword argument 'blocked_connection_timeout'
Run Code Online (Sandbox Code Playgroud)
heartbeat_interval=1000在连接参数中指定时,会显示类似的错误:TypeError: __init__() got an unexpected keyword argument 'heartbeat_interval'
同样,socket_timeout = 1000显示以下错误:TypeError: __init__() got an unexpected keyword argument 'socket_timeout'
我在Ubuntu 14.04上运行RabbitMQ 3.6.1,pika 0.10.0和python 2.7.
我已经阅读了类似问题的答案
我遇到了与您的系统相同的问题,在很长的任务中连接断开.
如果您的网络设置使得强制丢弃空闲TCP/IP连接,则心跳可能有助于保持连接活动.但是,如果情况并非如此,改变心跳将无济于事.
更改连接超时将无济于事.此设置仅在最初创建连接时使用.
我正在使用RabbitMQ生产者向消费者发送长时间运行的任务(30分钟+).问题是,当关闭服务器的连接并且未确认的任务被重新排队时,消费者仍在处理任务.
这有两个原因,你已经遇到过这两个原因:
部署RabbitMQ代码的任务范围从不到一秒到几个小时,我发现立即确认消息并使用状态消息更新系统最适合很长的任务,比如这样.
您需要有一个记录系统(可能带有数据库),以跟踪给定作业的状态.
当消费者接收消息并开始该过程时,它应立即确认该消息并向记录系统发送"已启动"状态消息.
当过程完成时,发送另一条消息说它已完成.
这不会解决掉线连接问题,但无论如何都无法100%解决.相反,它将阻止在断开连接时发生消息重新排队问题.
但是,这个解决方案确实引入了另一个问题:当长时间运行的进程崩溃时,你如何恢复工作?
基本答案是使用作业的记录系统(您的数据库)状态告诉您需要再次接受该工作.当应用程序启动时,请检查数据库以查看是否有未完成的工作.如果以适当的方式存在,恢复或重新启动该工作.
| 归档时间: |
|
| 查看次数: |
3457 次 |
| 最近记录: |