我有一个由三台机器组成的集群。我想celery beat
在这些上奔跑。我有几个相关问题。
CELERYBEAT_SCHEDULE
,我是否需要保留它?djcelery.schedulers.DatabaseScheduler
自动处理并发节拍守护进程?也就是说,如果我只使用 运行三个节拍守护进程DatabaseScheduler
,我是否可以免受重复任务的影响?DatabaseScheduler
但基于 MongoDB 的东西,没有 Django ORM?像 Celery\xe2\x80\x99s 一样拥有 MongoDB 代理和结果后端。我的问题是节拍调度程序不会在表'tasks'和'workers'中存储条目.我用django和芹菜.在我的数据库(MySQL)中,我添加了一个周期性的"估计区域",间隔为120秒.
这就是我开始工作的方式:
`python manage.py celery worker -n worker.node1 -B --loglevel=info &`
Run Code Online (Sandbox Code Playgroud)
在我启动工作人员之后,我可以在终端中看到工作人员正在工作,并且调度程序从数据库中挑选出定期任务并对其进行操作.
我的任务是如何定义的:
@celery.task(name='fv.tasks.estimateRegion',
ignore_result=True,
max_retries=3)
def estimateRegion(region):
Run Code Online (Sandbox Code Playgroud)
终端显示:
WARNING ModelEntry: Estimate Region fv.tasks.estimateRegion(*['ASIA'], **{}) {<freq: 2.00 minutes>}
[2013-05-23 10:48:19,166: WARNING/MainProcess] <ModelEntry: Estimate Region fv.tasks.estimateRegion(*['ASIA'], **{}) {<freq: 2.00 minutes>}>
INFO Calculating estimators for exchange:Bombay Stock Exchange
Run Code Online (Sandbox Code Playgroud)
任务"估计区域"返回一个results.csv文件,所以我可以看到工作者和节拍调度程序工作.但之后我的django管理面板中的"任务"或"工作人员"中没有数据库条目.
这是我在settings.py中的芹菜设置
`CELERY_DISABLE_RATE_LIMITS =真CELERY_TASK_SERIALIZER ='pickle'CELERY_RESULT_SERIALIZER ='pickle'CELERY_IMPORTS =('fv.tasks')CELERY_RESULT_PERSISTENT = True
# amqp settings
BROKER_URL = 'amqp://fv:password@localhost'
#BROKER_URL = 'amqp://fv:password@192.168.99.31'
CELERY_RESULT_BACKEND = 'amqp'
CELERY_TASK_RESULT_EXPIRES = 18000
CELERY_ROUTES = (fv.routers.TaskRouter(), )
_estimatorExchange = Exchange('estimator')
CELERY_QUEUES = ( …
Run Code Online (Sandbox Code Playgroud) 正如在docs类中看到的那样,任务是表达复杂逻辑的公平方式.
但是,文档没有指定如何将基于闪亮的新创建的基于类的任务添加到您CELERY_BEAT_SCHEDULE
(使用django)
我尝试过:
celery.py
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS, 'task_summary')
@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
from payments.tasks.generic.payeer import PayeerPaymentChecker
from payments.tasks.generic.ok_pay import OkPayPaymentChecker
okpay_import = OkPayPaymentChecker()
payeer_imprt = PayeerPaymentChecker()
sender.add_periodic_task(60.0, okpay_import.s(),
name='OkPay import',
expires=30)
sender.add_periodic_task(60.0, payeer_imprt.s(),
name='Payeer import',
expires=30)
Run Code Online (Sandbox Code Playgroud)
- 要么 -
payments/task_summary.py
from tasks.generic.import import OkPayPaymentChecker, PayeerPaymentChecker
run_okpay = OkPayPaymentChecker()
run_payeer = PayeerPaymentChecker()
CELERY_BEAT_SCHEDULE = {
# yes, i did try referring to the class here
'check_okpay_payments': {
'task': 'payments.tasks.task_summary.run_okpay',
'schedule': timedelta(seconds=60),
},
'check_payeer_payments': {
'task': 'payments.task_summary.run_payeer',
'schedule': timedelta(seconds=60), …
Run Code Online (Sandbox Code Playgroud) 撤销 @periodic_task 发送的任务
Discarding revoked tasks & Due task to workers.
[2018-09-17 12:23:50,864: INFO/MainProcess] Received task: cimexapp.tasks.add[xxxxxxx]
[2018-09-17 12:23:50,864: INFO/MainProcess] Discarding revoked task: cimexapp.tasks.add[xxxxxxx]
[2018-09-17 12:24:00,865: INFO/Beat] Scheduler: Sending due task cimexapp.tasks.add (cimexapp.tasks.add)
[2018-09-17 12:24:00,869: INFO/MainProcess] Received task: cimexapp.tasks.add[xxxxxxx]
[2018-09-17 12:24:00,869: INFO/MainProcess] Discarding revoked task: cimexapp.tasks.add[xxxxxxx]
[2018-09-17 12:24:10,865: INFO/Beat] Scheduler: Sending due task cimexapp.tasks.add (cimexapp.tasks.add)
[2018-09-17 12:24:10,868: INFO/MainProcess] Received task: cimexapp.tasks.add[xxxxxxx]
[2018-09-17 12:24:10,869: INFO/MainProcess] Discarding revoked task: cimexapp.tasks.add[xxxxxxx]
任务.py
@periodic_task(run_every=timedelta(seconds=10),options={"task_id":"xxxxxxx"})
def add():
call(["ping","-c10","google.com"])
def stop():
x = revoke("xxxxxxx",terminate=True,signal="KILL")
print(x) …
Run Code Online (Sandbox Code Playgroud) 我想安排每天在 16 点 UTC 运行两个任务。
为此,我实现了这个 celery 配置:
from celery.schedules import crontab
CELERY_IMPORTS = ('api.tasks')
CELERY_TASK_RESULT_EXPIRES = 30
CELERY_TIMEZONE = 'UTC'
CELERYBEAT_SCHEDULE = {
'book-task': {
'task': 'api.tasks.get_data',
# At 16h UTC everyday
'schedule': crontab(hour=16),
'args': ({'book'}),
},
'pencils-task': {
'task': 'api.tasks.get_data',
# At 16h UTC everyday
'schedule': crontab(hour=16),
'args': ({'pencil'}),
}
}
Run Code Online (Sandbox Code Playgroud)
我celery worker -A app.celery --loglevel=info --pool=solo
跑完后跑去跑芹菜工人celery beat -A app.celery
在运行启动芹菜节拍
通过上面的配置,我有两个任务从 UTC 16 点开始每分钟运行一次。我的配置有什么问题以及如何修复?
将芹菜文档描述了如何通过位置参数,以您的节拍计划任务列表或元组.
我有一个任务,只需一个参数,一个整数列表:
@shared_task
def schedule_by_ids(ids):
...
Run Code Online (Sandbox Code Playgroud)
我的celerybeat时间表如下:
CELERYBEAT_SCHEDULE = {
'schedule_by_ids': {
'task': 'myproj.app.tasks.schedule_by_ids',
'schedule': crontab(minute='*/10', hour='8-21'),
'args': ([1,]),
},
}
Run Code Online (Sandbox Code Playgroud)
我的任务失败,"int不可迭代" TypeError
.根据我的显示器(芹菜花),args传递为[1]
.
当我将args作为列表时,例如[[1]]
,arg显示在监视器中,[[1]]
并且它工作正常.
我的问题是:当它是一个元组时,它是如何通过args的?为什么?
celery ×6
celerybeat ×6
django ×3
python ×3
celery-task ×1
concurrency ×1
config ×1
mongodb ×1
mysql ×1