sat*_*enu 3 python celery flask
我正在与 Flask 和远程 celery 工作人员一起工作,对于 celery 通信,我使用rabbitmq 作为消息代理。远程芹菜工作人员随机抛出错误,如下所示:-
[2020-09-03 13:49:59,390: CRITICAL/MainProcess] Couldn't ack 20, reason:RecoverableConnectionError(None, 'connection already closed', None, '')
Traceback (most recent call last):
File "c:\users\g-us01.test\.virtualenvs\celery_doors_integration-o4w-mxzx\lib\site-packages\kombu\message.py", line 131, in ack_log_error
self.ack(multiple=multiple)
File "c:\users\g-us01.test\.virtualenvs\celery_doors_integration-o4w-mxzx\lib\site-packages\kombu\message.py", line 126, in ack
self.channel.basic_ack(self.delivery_tag, multiple=multiple)
File "c:\users\g-us01.test\.virtualenvs\celery_doors_integration-o4w-mxzx\lib\site-packages\amqp\channel.py", line 1394, in basic_ack
spec.Basic.Ack, argsig, (delivery_tag, multiple),
File "c:\users\g-us01.test\.virtualenvs\celery_doors_integration-o4w-mxzx\lib\site-packages\amqp\abstract_channel.py", line 56, in send_method
raise RecoverableConnectionError('connection already closed')
amqp.exceptions.RecoverableConnectionError: connection already closed
Run Code Online (Sandbox Code Playgroud)
我正在使用 celery 版本 4。有关如何避免此错误的任何指示都会有所帮助。
当运行很长的任务时,我对 celery 版本 4.4.6 和rabbitmq 也有同样的问题。然后,我使用以下配置更改运行相同的任务,现在它可以工作了(我以独奏模式运行工作程序)。重要的配置似乎是代理心跳:https://www.rabbitmq.com/heartbeats.html。这应该禁用心跳,并且连接不应因错过心跳而重置。
CELERY_BROKER_HEARTBEAT = 0
Run Code Online (Sandbox Code Playgroud)
链接到 celery 文档:https://docs.celeryproject.org/en/v4.4.6/userguide/configuration.html#std :setting-broker_heartbeat
与 Flask 的集成应该像这样工作:
from flask import Flask
from celery import Celery
app = Flask(__name__)
app.config['CELERY_BROKER_URL'] =
'amqp://myuser:mypassword@localhost:5672/myvhost'
app.config['CELERY_BROKER_HEARTBEAT'] = 0
celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])
celery.conf.update(app.config)
Run Code Online (Sandbox Code Playgroud)
我可以在这里推荐这个博客(代码片段来源): https: //blog.miguelgrinberg.com/post/using-celery-with-flask