标签: celerybeat

如何协调 celerybeat 守护进程集群?

我有一个由三台机器组成的集群。我想celery beat在这些上奔跑。我有几个相关问题。

\n\n
    \n
  1. Celery 有持久调度程序的概念。只要我的日程安排仅包含 crontab 条目并且由 静态定义CELERYBEAT_SCHEDULE,我是否需要保留它?
  2. \n
  3. 如果这样做,那么我是否必须确保该存储在集群的所有机器之间同步?
  4. \n
  5. 是否djcelery.schedulers.DatabaseScheduler自动处理并发节拍守护进程?也就是说,如果我只使用 运行三个节拍守护进程DatabaseScheduler,我是否可以免受重复任务的影响?
  6. \n
  7. 有没有类似DatabaseScheduler但基于 MongoDB 的东西,没有 Django ORM?像 Celery\xe2\x80\x99s 一样拥有 MongoDB 代理和结果后端。
  8. \n
\n

concurrency mongodb celery celerybeat

1
推荐指数
1
解决办法
1542
查看次数

django和芹菜击败调度程序没有数据库条目

我的问题是节拍调度程序不会在表'tasks'和'workers'中存储条目.我用django和芹菜.在我的数据库(MySQL)中,我添加了一个周期性的"估计区域",间隔为120秒.

这就是我开始工作的方式:

`python manage.py celery worker -n worker.node1 -B --loglevel=info &`
Run Code Online (Sandbox Code Playgroud)

在我启动工作人员之后,我可以在终端中看到工作人员正在工作,并且调度程序从数据库中挑选出定期任务并对其进行操作.

我的任务是如何定义的:

@celery.task(name='fv.tasks.estimateRegion',
             ignore_result=True,
             max_retries=3)
def estimateRegion(region):
Run Code Online (Sandbox Code Playgroud)

终端显示:

WARNING ModelEntry: Estimate Region fv.tasks.estimateRegion(*['ASIA'], **{}) {<freq: 2.00 minutes>}
[2013-05-23 10:48:19,166: WARNING/MainProcess] <ModelEntry: Estimate Region fv.tasks.estimateRegion(*['ASIA'], **{}) {<freq: 2.00 minutes>}>
INFO Calculating estimators for exchange:Bombay Stock Exchange
Run Code Online (Sandbox Code Playgroud)

任务"估计区域"返回一个results.csv文件,所以我可以看到工作者和节拍调度程序工作.但之后我的django管理面板中的"任务"或"工作人员"中没有数据库条目.

这是我在settings.py中的芹菜设置

`CELERY_DISABLE_RATE_LIMITS =真CELERY_TASK_SERIALIZER ='pickle'CELERY_RESULT_SERIALIZER ='pickle'CELERY_IMPORTS =('fv.tasks')CELERY_RESULT_PERSISTENT = True

# amqp settings
BROKER_URL = 'amqp://fv:password@localhost'
#BROKER_URL = 'amqp://fv:password@192.168.99.31'
CELERY_RESULT_BACKEND = 'amqp'
CELERY_TASK_RESULT_EXPIRES = 18000
CELERY_ROUTES = (fv.routers.TaskRouter(), )
_estimatorExchange = Exchange('estimator')
CELERY_QUEUES = ( …
Run Code Online (Sandbox Code Playgroud)

mysql django celery celerybeat

1
推荐指数
1
解决办法
909
查看次数

如何将基于类的任务传递给CELERY_BEAT_SCHEDULE

正如在docs类中看到的那样,任务是表达复杂逻辑的公平方式.

但是,文档没有指定如何将基于闪亮的新创建的基于类的任务添加到您CELERY_BEAT_SCHEDULE(使用django)

我尝试过: celery.py

app.autodiscover_tasks(lambda: settings.INSTALLED_APPS, 'task_summary')
@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
    from payments.tasks.generic.payeer import PayeerPaymentChecker
    from payments.tasks.generic.ok_pay import OkPayPaymentChecker

    okpay_import = OkPayPaymentChecker()
    payeer_imprt = PayeerPaymentChecker()

    sender.add_periodic_task(60.0, okpay_import.s(),
                             name='OkPay import',
                             expires=30)

    sender.add_periodic_task(60.0, payeer_imprt.s(),
                             name='Payeer import',
                             expires=30)
Run Code Online (Sandbox Code Playgroud)

- 要么 -

payments/task_summary.py

from tasks.generic.import import OkPayPaymentChecker, PayeerPaymentChecker
 run_okpay = OkPayPaymentChecker()
 run_payeer = PayeerPaymentChecker()

CELERY_BEAT_SCHEDULE = {
# yes, i did try referring to the class here
'check_okpay_payments': {

    'task': 'payments.tasks.task_summary.run_okpay',
    'schedule': timedelta(seconds=60),
},
'check_payeer_payments': {
    'task': 'payments.task_summary.run_payeer',
    'schedule': timedelta(seconds=60), …
Run Code Online (Sandbox Code Playgroud)

python django celery celerybeat class-based-tasks

1
推荐指数
1
解决办法
2080
查看次数

我们如何完成 celery period_task 中的停止任务?

撤销 @periodic_task 发送的任务Discarding revoked tasks & Due task to workers.

芹菜工人屏幕截图

[2018-09-17 12:23:50,864: INFO/MainProcess] Received task: cimexapp.tasks.add[xxxxxxx]
[2018-09-17 12:23:50,864: INFO/MainProcess] Discarding revoked task: cimexapp.tasks.add[xxxxxxx] [2018-09-17 12:24:00,865: INFO/Beat] Scheduler: Sending due task cimexapp.tasks.add (cimexapp.tasks.add) [2018-09-17 12:24:00,869: INFO/MainProcess] Received task: cimexapp.tasks.add[xxxxxxx]
[2018-09-17 12:24:00,869: INFO/MainProcess] Discarding revoked task: cimexapp.tasks.add[xxxxxxx] [2018-09-17 12:24:10,865: INFO/Beat] Scheduler: Sending due task cimexapp.tasks.add (cimexapp.tasks.add) [2018-09-17 12:24:10,868: INFO/MainProcess] Received task: cimexapp.tasks.add[xxxxxxx]
[2018-09-17 12:24:10,869: INFO/MainProcess] Discarding revoked task: cimexapp.tasks.add[xxxxxxx]


任务.py

@periodic_task(run_every=timedelta(seconds=10),options={"task_id":"xxxxxxx"})
def add():
     call(["ping","-c10","google.com"])


def stop():
    x = revoke("xxxxxxx",terminate=True,signal="KILL")
    print(x) …
Run Code Online (Sandbox Code Playgroud)

django celery celery-task django-celery celerybeat

1
推荐指数
1
解决办法
6500
查看次数

安排任务每天在特定时间运行一次 | 芹菜

我想安排每天在 16 点 UTC 运行两个任务。

为此,我实现了这个 celery 配置:

from celery.schedules import crontab


CELERY_IMPORTS = ('api.tasks')
CELERY_TASK_RESULT_EXPIRES = 30
CELERY_TIMEZONE = 'UTC'

CELERYBEAT_SCHEDULE = {
    'book-task': {
        'task': 'api.tasks.get_data',
        # At 16h UTC everyday
        'schedule': crontab(hour=16),
        'args': ({'book'}),
    },
    'pencils-task': {
        'task': 'api.tasks.get_data',
        # At 16h UTC everyday
        'schedule': crontab(hour=16),
        'args': ({'pencil'}),
    }
}

Run Code Online (Sandbox Code Playgroud)

celery worker -A app.celery --loglevel=info --pool=solo 跑完后跑去跑芹菜工人celery beat -A app.celery 在运行启动芹菜节拍

通过上面的配置,我有两个任务从 UTC 16 点开始每分钟运行一次。我的配置有什么问题以及如何修复?

python config celery celerybeat

1
推荐指数
1
解决办法
2990
查看次数

芹菜击败args:列表与元组

芹菜文档描述了如何通过位置参数,以您的节拍计划任务列表或元组.

我有一个任务,只需一个参数,一个整数列表:

@shared_task
def schedule_by_ids(ids):
  ...
Run Code Online (Sandbox Code Playgroud)

我的celerybeat时间表如下:

CELERYBEAT_SCHEDULE = {
    'schedule_by_ids': {
        'task': 'myproj.app.tasks.schedule_by_ids',
        'schedule': crontab(minute='*/10', hour='8-21'),
        'args': ([1,]),
    },
}
Run Code Online (Sandbox Code Playgroud)

我的任务失败,"int不可迭代" TypeError.根据我的显示器(芹菜花),args传递为[1].

当我将args作为列表时,例如[[1]],arg显示在监视器中,[[1]]并且它工作正常.

我的问题是:当它是一个元组时,它是如何通过args的?为什么?

python celery django-celery celerybeat

0
推荐指数
1
解决办法
1449
查看次数