芹菜工人 - 连接已关闭错误

sat*_*enu 3 python celery flask

我正在与 Flask 和远程 celery 工作人员一起工作,对于 celery 通信,我使用rabbitmq 作为消息代理。远程芹菜工作人员随机抛出错误,如下所示:-

[2020-09-03 13:49:59,390: CRITICAL/MainProcess] Couldn't ack 20, reason:RecoverableConnectionError(None, 'connection already closed', None, '')
Traceback (most recent call last):
  File "c:\users\g-us01.test\.virtualenvs\celery_doors_integration-o4w-mxzx\lib\site-packages\kombu\message.py", line 131, in ack_log_error
    self.ack(multiple=multiple)
  File "c:\users\g-us01.test\.virtualenvs\celery_doors_integration-o4w-mxzx\lib\site-packages\kombu\message.py", line 126, in ack
    self.channel.basic_ack(self.delivery_tag, multiple=multiple)
  File "c:\users\g-us01.test\.virtualenvs\celery_doors_integration-o4w-mxzx\lib\site-packages\amqp\channel.py", line 1394, in basic_ack
    spec.Basic.Ack, argsig, (delivery_tag, multiple),
  File "c:\users\g-us01.test\.virtualenvs\celery_doors_integration-o4w-mxzx\lib\site-packages\amqp\abstract_channel.py", line 56, in send_method
    raise RecoverableConnectionError('connection already closed')
amqp.exceptions.RecoverableConnectionError: connection already closed
Run Code Online (Sandbox Code Playgroud)

我正在使用 celery 版本 4。有关如何避免此错误的任何指示都会有所帮助。

sid*_*di7 6

当运行很长的任务时,我对 celery 版本 4.4.6 和rabbitmq 也有同样的问题。然后,我使用以下配置更改运行相同的任务,现在它可以工作了(我以独奏模式运行工作程序)。重要的配置似乎是代理心跳:https://www.rabbitmq.com/heartbeats.html。这应该禁用心跳,并且连接不应因错过心跳而重置。

CELERY_BROKER_HEARTBEAT = 0
Run Code Online (Sandbox Code Playgroud)

链接到 celery 文档:https://docs.celeryproject.org/en/v4.4.6/userguide/configuration.html#std :setting-broker_heartbeat

与 Flask 的集成应该像这样工作:

from flask import Flask
from celery import Celery

app = Flask(__name__)
app.config['CELERY_BROKER_URL'] =
                            'amqp://myuser:mypassword@localhost:5672/myvhost'
app.config['CELERY_BROKER_HEARTBEAT'] = 0

celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])
celery.conf.update(app.config)
Run Code Online (Sandbox Code Playgroud)

我可以在这里推荐这个博客(代码片段来源): https: //blog.miguelgrinberg.com/post/using-celery-with-flask

  • 可以确认这是要修复的解决方案。对于 celery 5x 执行``broker_pool_limit = None task_acks_late = Truebroker_heartbeat = 0worker_prefetch_multiplier = 1``` (2认同)