如何在 Django 中测试 celery period_task?

Nar*_*nik 8 python django cron celery periodic-task

我有一个简单的周期性任务:

from celery.decorators import periodic_task
from celery.task.schedules import crontab
from .models import Subscription

@periodic_task(run_every=crontab(minute=0, hour=0))
def deactivate_subscriptions():
    for subscription in Subscription.objects.filter(is_expired=True):
        print(subscription)
        subscription.is_active = False
        subscription.can_activate = False
        subscription.save()
Run Code Online (Sandbox Code Playgroud)

我想用测试来覆盖它。

我找到了有关如何测试简单任务的信息,例如@shared_task,但我找不到测试的示例@periodic_task

Nad*_*xan 0

使用函数apply(). 的文档apply()