sqe*_*sqe 8 python django cron runtime celery
我有一个简单的 Django (v3.1) 应用程序,我从表单接收数据,用视图处理它,然后将它传递给 Celery(v4.4.7,RabbitMQ 作为代理)。根据表单中提交的数据,它可以是一次性任务或周期性任务。
周期性任务应该执行与一次性任务相同的任务,但是,嗯,有一个周期性的计划。我想将该计划传递给任务,包括开始日期、结束日期和间隔(例如:每 2 天下午 4 点执行一次,从现在开始到 4 周)。
我的观点(当然,为了说明目的而缩短和重命名):
# views.py
if request.method == 'POST':
form = BackupForm(request.POST)
if form.is_valid():
data = ...
if not form.cleaned_data['periodic']:
# execute one-time task
celery_task = single_task.delay(data)
else:
schedule = {
'first_backup': form.cleaned_data['first_backup'],
'last_backup': form.cleaned_data['last_backup'],
'intervall_every': form.cleaned_data['intervall_every'],
'intervall_unit': form.cleaned_data['intervall_unit'],
'intervall_time': form.cleaned_data['intervall_time'],
}
# execute periodic task, depending on the schedule submitted in the form
celery_task = single_task.delay(data, schedule=schedule)
return HttpResponseRedirect(reverse('app:index'))
Run Code Online (Sandbox Code Playgroud)
单个任务如下所示:
# tasks.py
@shared_task
def single_task(data: dict, **kwargs) -> None:
asyncio.run(bulk_screen(data=data))
# TODO: receive schedule if periodic and create a periodic task with it
Run Code Online (Sandbox Code Playgroud)
这适用于单个任务。但是,我不知道如何调整它以创建动态周期性任务。我的日程数据因用户的表单输入而异。我必须在运行时创建周期性任务。
根据有关定期任务的官方文档,crontab 计划是我需要的:
from celery.schedules import crontab
app.conf.beat_schedule = {
# Executes every Monday morning at 7:30 a.m.
'add-every-monday-morning': {
'task': 'tasks.add',
'schedule': crontab(hour=7, minute=30, day_of_week=1),
'args': (16, 16),
},
}
Run Code Online (Sandbox Code Playgroud)
虽然这看起来不错,但它位于带有硬编码时间表的 celery 配置中。
我还阅读了有关on_after_finalize.connect装饰器的信息,我可以在其中执行以下操作:
@celery_app.on_after_finalize.connect
def setup_periodic_tasks(sender, **kwargs):
sender.add_periodic_task(10.0, task123.s('hello'))
Run Code Online (Sandbox Code Playgroud)
但我不知道如何将时间表传递给这个函数。另外,发件人是什么?我可以从我的角度通过它吗?
然后我阅读了有关在 celery beat 中填充相关模型的信息。但我想必须有一种更优雅的方式,使用没有过时装饰器的稳定版本。
谢谢你。
| 归档时间: |
|
| 查看次数: |
576 次 |
| 最近记录: |