KeyError 在注册任务时在 celery 上收到类型为“”的未注册任务

Ash*_*ian 10 python django celery celery-task python-3.x

我对芹菜配置​​有点陌生。

我有一个名为myapp.tasks.my_task例如的任务。

myapp.tasks.my_task当我使用 时,我可以在 celery 的注册任务中看到celery inspect registered。不是说任务注册成功了吗?为什么它会引发以下错误:

KeyError celery.worker.consumer.consumer in on_task_received

Received unregistered task of type 'my_app.tasks.my_task'.
The message has been ignored and discarded.

Did you remember to import the module containing this task?
Or maybe you're using relative imports?

Please see
http://docs.celeryq.org/en/latest/internals/protocol.html
for more information.

The full contents of the message body was:
'[[], {}, {"callbacks": null, "errbacks": null, "chain": null, "chord": null}]' (77b)

Run Code Online (Sandbox Code Playgroud)

还有其他任务my_app.tasks,它们工作正常,但只有这个任务不起作用并得到KeyError

@shared_task(queue='celery')
def other_task():
   """ WORKS """
   ...

@shared_task(queue='celery')
def my_task():
   """ DOES NOT WORK """
   ...

Run Code Online (Sandbox Code Playgroud)

Nie*_*ano 8

my_app.tasks.my_task这意味着Celery在调用任务时找不到任务的实现。您可能需要查看一些可能的解决方案:

可能的解决方案1:

您可能也没有正确配置:

  • 芹菜进口例如celery_app.conf.update(imports=['my_app.tasks'])celery_app.conf.imports = ['my_app.tasks']
  • 或者芹菜包括示例)例如celery_app = Celery(..., include=['my_app.tasks'])

注意:如果在 Django 应用程序中,如果已使用,则可以跳过此步骤celery_app.autodiscover_tasks(),因为任务会在该位置自动发现./<app_name>/tasks.py

可能的解决方案2:

如果您只导入my_app例如celery_app.conf.update(imports=['my_app']),那么我假设您有一个文件my_app/__init__.py确保在该文件内,它会导入任务my_app.tasks.my_task,以便my_app.tasks.other_taskcelery 应用程序知道此类任务存在。

# Contents of my_app/__init__.py
from my_app.tasks import (
    my_task,
    other_task,
)
Run Code Online (Sandbox Code Playgroud)

可能的解决方案3:

如果my_task是新添加的(而other_task已经是旧的现有任务),您可能还没有重新启动 celery 工作线程来查看新任务。尝试重新启动工作程序。


Ash*_*ian 5

那天,当我遇到这个问题时,不幸的是,一位前辈以蘑菇管理方式为我解决了这个问题(请参阅此处有关此反模式的更多信息)。我最近又回到了这个问题,以在我们自己的项目域中找出解决方案。

正如尼尔在他/她的解决方案中指出的那样,我们在我们的项目中使用celery_app.autodiscover_tasks(),在这种情况下,我们应该像下面这样my_task导入包。__init__.pytasks

from .some_tasks_file import my_task
Run Code Online (Sandbox Code Playgroud)

另外,我们使用了 celerybeat,并且内部定义的任务app.conf.beat_schedule必须具有函数的精确路径,如下所示(即使该函数是从__init__.py包中导入的tasks)。

app.conf.beat_schedule = {
    'MY_TASK': {
        'task': 'myapp.tasks.some_tasks_file.my_task',
        'schedule': 60,  # every minute
    },
}
Run Code Online (Sandbox Code Playgroud)

希望这能帮助有相同芹菜配置和问题的人。