如何将基于类的任务传递给CELERY_BEAT_SCHEDULE

Ole*_*sov 1 python django celery celerybeat class-based-tasks

正如在docs类中看到的那样,任务是表达复杂逻辑的公平方式.

但是,文档没有指定如何将基于闪亮的新创建的基于类的任务添加到您CELERY_BEAT_SCHEDULE(使用django)

我尝试过: celery.py

app.autodiscover_tasks(lambda: settings.INSTALLED_APPS, 'task_summary')
@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
    from payments.tasks.generic.payeer import PayeerPaymentChecker
    from payments.tasks.generic.ok_pay import OkPayPaymentChecker

    okpay_import = OkPayPaymentChecker()
    payeer_imprt = PayeerPaymentChecker()

    sender.add_periodic_task(60.0, okpay_import.s(),
                             name='OkPay import',
                             expires=30)

    sender.add_periodic_task(60.0, payeer_imprt.s(),
                             name='Payeer import',
                             expires=30)
Run Code Online (Sandbox Code Playgroud)

- 要么 -

payments/task_summary.py

from tasks.generic.import import OkPayPaymentChecker, PayeerPaymentChecker
 run_okpay = OkPayPaymentChecker()
 run_payeer = PayeerPaymentChecker()

CELERY_BEAT_SCHEDULE = {
# yes, i did try referring to the class here
'check_okpay_payments': {

    'task': 'payments.tasks.task_summary.run_okpay',
    'schedule': timedelta(seconds=60),
},
'check_payeer_payments': {
    'task': 'payments.task_summary.run_payeer',
    'schedule': timedelta(seconds=60),
},
}
Run Code Online (Sandbox Code Playgroud)

真的不知道该做什么,恢复到类似的东西: payments/task_summary.py/

from payments.tasks.generic.ok_pay import OkPayPaymentChecker
from payments.tasks.generic.payeer import PayeerPaymentChecker
from celery import shared_task


@shared_task
def run_payer():
    instance = PayeerPaymentChecker()
    return instance.run()


@shared_task
def run_okpay():
    instance = OkPayPaymentChecker()
    return instance.run()
Run Code Online (Sandbox Code Playgroud)

在线资源我已经检查过但没有帮助我/解决问题:

ptr*_*ptr 5

我花了一段时间才找到答案,因为这个问题在谷歌搜索结果中如此之高,我想我会把它放在这里为那些正在努力寻找答案的人们:

您可以像使用普通任务一样添加它,但使用类名.

CELERY_BEAT_SCHEDULE = {
    'my_task_name': {
        'task': 'mymodule.tasks.MyTaskClass',
        'schedule': timedelta(seconds=60),
},
Run Code Online (Sandbox Code Playgroud)

(这假设你有mymodule/tasks.py:

from celery import Task

class MyTaskClass(Task):

    def run(self, *args, **kwargs):
       ... stuff ...
Run Code Online (Sandbox Code Playgroud)