Kri*_*ber 6 django celery celery-task celerybeat
我正在尝试学习如何使用 celery 在我的一个模型上每天检查日期。我的一个模型包含一个到期日期和一个布尔字段,表明他们的保险是否已过期。
模型很大,所以我要发布一个精简版。我想我有两个选择。要么在模型方法上运行 celery 任务,要么在我的 tasks.py 中重写该函数。然后我需要使用 Celery beat 来运行日程表来每天检查。
我有这个功能可以工作,但我直接传递了我认为错误的模型对象。
我也遇到了如何在 celery.py 中的 celery beat 调度程序中使用 args 的问题。
我真的很接近让这个工作,但我想我会以错误的方式执行任务。我认为在模型方法上执行任务可能是最干净的,我只是不确定如何完成它。
模型.py
class CarrierCompany(models.Model):
name = models.CharField(max_length=255, unique=True)
insurance_expiration = models.DateTimeField(null=True)
insurance_active = models.BooleanField()
def insurance_expiration_check(self):
if self.insurance_expiration > datetime.today().date():
self.insurance_active = True
self.save()
print("Insurance Active")
else:
self.insurance_active = False
self.save()
print("Insurance Inactive")
Run Code Online (Sandbox Code Playgroud)
任务.py
from __future__ import absolute_import, unicode_literals
from celery.decorators import task
from datetime import datetime, date
from django.utils import timezone
from .models import CarrierCompany
@task(name="insurance_expired")
def insurance_date():
carriers = CarrierCompany.objects.all()
for carrier in carriers:
date = datetime.now(timezone.utc)
if carrier.insurance_expiration > date:
carrier.insurance_active = True
carrier.save()
print("Insurance Active")
else:
carrier.insurance_active = False
carrier.save()
print("Insurance Inactive")
Run Code Online (Sandbox Code Playgroud)
芹菜.py
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
from celery.schedules import crontab
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'config.settings.local')
app = Celery('POTRTMS')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()
@app.task(bind=True)
def debug_task(self):
print('Request: {0!r}'.format(self.request))
app.conf.beat_schedule = {
'check-insurance-daily': {
'task': 'insurance_expired',
'schedule': crontab(hour='8')
},
}
Run Code Online (Sandbox Code Playgroud)
*** 更新了节拍时间表以反映我真正想要运行它的时间。
我如何做到这一点的一个例子如下。另外,如果您在 Django 应用程序中包含时区,而不是使用传统的日期时间,您可能需要使用此处找到的时区库。
模型.py
class CarrierCompany(models.Model):
...
@property
def is_insurance_expired(self):
from django.utils import timezone
if self.insurance_expiration > timezone.datetime.today():
print("Insurance Active")
return True
else:
print("Insurance Active")
return False
Run Code Online (Sandbox Code Playgroud)
任务.py
def insurance_date():
carriers = CarrierCompany.objects.all()
for carrier in carriers:
if carrier.is_insurance_expired:
carrier.insurance_active = True
carrier.save()
else:
carrier.insurance_active = False
carrier.save()
Run Code Online (Sandbox Code Playgroud)
您还可以做其他事情,例如如果它为 False,则不更新它并使其默认为 False,或者在 True 情况下反之亦然。您也可以在模型函数中完成所有这些操作,尽管我个人喜欢将逻辑保持一点独立(只是我如何组织我的东西)。希望这可以帮助您摆脱困境。
| 归档时间: |
|
| 查看次数: |
4849 次 |
| 最近记录: |