Mri*_*lla 68 python django celery celery-task django-celery
我正在使用芹菜和django芹菜.我已经定义了一个我想测试的周期性任务.是否可以手动从shell运行周期性任务,以便查看控制台输出?
Pla*_*ure 92
您是否尝试过从Django shell运行任务?您可以使用.apply任务的方法来确保它在热切和本地运行.
假设在子模块my_task中的Django应用程序myapp中调用该任务tasks:
$ python manage.py shell
>>> from myapp.tasks import my_task
>>> eager_result = my_task.apply()
Run Code Online (Sandbox Code Playgroud)
结果实例具有与通常AsyncResult类型相同的API ,除了始终热切地和本地评估结果,并且该.apply()方法将阻塞,直到任务运行完成.
W.P*_*rin 12
如果您的意思是仅在不满足条件(例如,不满足定期时间)时触发任务。您可以分两步完成。
1.获取任务ID。
您可以通过键入来完成。
celery inspect registered
Run Code Online (Sandbox Code Playgroud)
您会看到类似的内容app.tasks.update_something。如果没有,则可能celery是尚未启动。只是运行它。
2.运行任务 celery call
celery call app.tasks.update_something
Run Code Online (Sandbox Code Playgroud)
有关更多详细信息,只需键入
celery --help
celery inspect --help
celery call --help
Run Code Online (Sandbox Code Playgroud)
我想你需要打开两个shell:一个用于从Python/Django shell执行任务,另一个用于running celery worker(python manage.py celery worker).正如之前的回答所说,你可以使用apply()或运行任务apply_async()
我已编辑了答案,因此您没有使用已弃用的命令.