Chg*_*gad 7 django time celerybeat
我在 django 项目中处理 celery beat 任务,该项目定期创建数据库条目。我知道是因为当我像这样设置任务时:
芹菜.py:
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
from celery.schedules import crontab
app = Celery("clock-backend", broker=os.environ.get("RABBITMQ_URL"))
app.config_from_object("django.conf:settings", namespace="CELERY")
app.conf.beat_schedule = {
'create_reports_monthly': {
'task': 'project_celery.tasks.create_reports_monthly',
'schedule': 10.0,
},
}
app.autodiscover_tasks()
Run Code Online (Sandbox Code Playgroud)
并开始我的项目,它真的每 10 秒创建一个对象。
但我真正想做的是将它设置为每个月的第一天运行。
这样做我会改变"schedule": crontab(0, 0, day_of_month="1")。
我的实际问题来了:我如何测试这是否真的有效?
通过测试,我指的是实际(单元)测试。
我尝试过的是使用名为freezegun的包。一个这样的测试看起来像这样:
def test_start_of_month_report_creation(self, user_object, contract_object, report_object):
# set time to the last day of January
with freeze_time("2019-01-31 23:59:59") as frozen_time:
# let one second pass
frozen_time.tick()
# give the celery task some time
time.sleep(20)
# Test Logic to check whether the object was created
# Example: assert MyModel.objects.count() > 0
Run Code Online (Sandbox Code Playgroud)
但这不起作用。我怀疑 celery beat 不使用通过 freezgun/python 设置的时间,而是使用真正的“硬件”时钟。
感谢您对此主题的任何评论、评论或帮助,因为我真的很想为此实施一个测试。
| 归档时间: |
|
| 查看次数: |
1135 次 |
| 最近记录: |