celery中不同时期运行的任务一共有8个。所有这些都是事件驱动的任务。某次事件发生后,他们被解雇了。并且特定任务会持续进行,直到满足某些条件为止。
我已经注册了一项任务,该任务会检查某些条件近两分钟。这个任务在大多数情况下都可以正常工作。但有时任务的预期行为并未达到。
任务签名如下:
任务.py
import time
from celery import shared_task
@shared_task()
def some_celery_task(a, b):
main_time_end = time.time() + 120
while time.time() < main_time_end:
...
# some db operations here with given function arguments 'a' and 'b'
# this part of the task get execute most of the time
if time.time() > main_time_end:
...
# some db operations here.
# this part is the part of the task that doesn't get executed sometimes
Run Code Online (Sandbox Code Playgroud)
视图.py
# the other part of the view …Run Code Online (Sandbox Code Playgroud) 在仅实例化数据库引擎一次的情况下,在Pyramid应用程序和Celery任务之间共享SQLAlchemy会话的最佳方法是什么?我在这里看了这个答案,但是,我不想创建另一个引擎(也恰好是全局引擎),因为这不是很干.此外,在Pyramid应用程序启动期间,应用程序.ini设置被传递到main函数中,因此我希望能够使用此方法配置引擎,但也可以将其用于所有Celery任务.当Celery与Pyramid集成时,或许我会采用错误的方式解决问题?谢谢你的帮助!
Celery 对任务的 eta 有限制吗?我想foo在 12 天后执行该方法,Celery 会不会有问题?或者我需要为这么长的 eta 配置任何 Celery 设置吗?
next_run = datetime.now() + timedelta(days = 12)
foo.apply_async(args=[], eta = next_run)
Run Code Online (Sandbox Code Playgroud) 当我尝试启动工作程序时,出现一个问题:
ImportError:没有名为“ project”的模块
追溯(最近一次通话):
File "/usr/local/bin/celery", line 11, in <module>
sys.exit(main())
File "/usr/local/lib/python3.5/dist-packages/celery/__main__.py", line 16, in main
_main()
File "/usr/local/lib/python3.5/dist-packages/celery/bin/celery.py", line 322, in main
cmd.execute_from_commandline(argv)
File "/usr/local/lib/python3.5/dist-packages/celery/bin/celery.py", line 496, in execute_from_commandline
super(CeleryCommand, self).execute_from_commandline(argv)))
File "/usr/local/lib/python3.5/dist-packages/celery/bin/base.py", line 273, in execute_from_commandline
argv = self.setup_app_from_commandline(argv)
File "/usr/local/lib/python3.5/dist-packages/celery/bin/base.py", line 479, in setup_app_from_commandline
self.app = self.find_app(app)
File "/usr/local/lib/python3.5/dist-packages/celery/bin/base.py", line 501, in find_app
return find_app(app, symbol_by_name=self.symbol_by_name)
File "/usr/local/lib/python3.5/dist-packages/celery/app/utils.py", line 359, in find_app
sym = symbol_by_name(app, imp=imp)
File "/usr/local/lib/python3.5/dist-packages/celery/bin/base.py", line 504, in symbol_by_name
return imports.symbol_by_name(name, imp=imp)
File …Run Code Online (Sandbox Code Playgroud) 撤销 @periodic_task 发送的任务
Discarding revoked tasks & Due task to workers.
[2018-09-17 12:23:50,864: INFO/MainProcess] Received task: cimexapp.tasks.add[xxxxxxx]
[2018-09-17 12:23:50,864: INFO/MainProcess] Discarding revoked task: cimexapp.tasks.add[xxxxxxx]
[2018-09-17 12:24:00,865: INFO/Beat] Scheduler: Sending due task cimexapp.tasks.add (cimexapp.tasks.add)
[2018-09-17 12:24:00,869: INFO/MainProcess] Received task: cimexapp.tasks.add[xxxxxxx]
[2018-09-17 12:24:00,869: INFO/MainProcess] Discarding revoked task: cimexapp.tasks.add[xxxxxxx]
[2018-09-17 12:24:10,865: INFO/Beat] Scheduler: Sending due task cimexapp.tasks.add (cimexapp.tasks.add)
[2018-09-17 12:24:10,868: INFO/MainProcess] Received task: cimexapp.tasks.add[xxxxxxx]
[2018-09-17 12:24:10,869: INFO/MainProcess] Discarding revoked task: cimexapp.tasks.add[xxxxxxx]
任务.py
@periodic_task(run_every=timedelta(seconds=10),options={"task_id":"xxxxxxx"})
def add():
call(["ping","-c10","google.com"])
def stop():
x = revoke("xxxxxxx",terminate=True,signal="KILL")
print(x) …Run Code Online (Sandbox Code Playgroud) 我有正在运行的 Celery 3.1 应用程序,它记录了一些敏感信息。理想情况下,我希望有相同的日志,但没有结果部分。
目前它看起来像:
worker_1 | [2019-12-10 13:46:40,052: INFO/MainProcess] Task xxxxx succeeded in 13.19569299298746s: yyyyyyy
Run Code Online (Sandbox Code Playgroud)
我想拥有:
worker_1 | [2019-12-10 13:46:40,052: INFO/MainProcess] Task xxxxx succeeded in 13.19569299298746s
Run Code Online (Sandbox Code Playgroud)
怎么做?
编辑:看来这可以完成这项工作:https://docs.celeryproject.org/en/3.1/reference/celery.worker.job.html#celery.worker.job.Request.success_msg但我不知道如何来实际使用它。
celery ×6
celery-task ×6
python ×4
django ×3
asynchronous ×1
celerybeat ×1
pyramid ×1
python-3.6 ×1