Esp*_*azi 7 python cron scheduling celery celerybeat
情况
我有一个芹菜任务,我在不同的时区为每个客户运行。
基本上,对于数据库中的每个客户,我都会获取时区,然后以这种方式设置 celery 任务。
'schedule': crontab(minute=30, hour=14, nowfun=self.now_function)
Run Code Online (Sandbox Code Playgroud)
基本上,我想要的是在客户时区 14:30 运行的任务。因此就有了 now_function。
我的now_function只是获取客户时区的当前时间。
def now_function(self):
"""
return the now function for the task
this is used to compute the time a task should be scheduled for a given customer
"""
return datetime.now(timezone(self.customer.c_timezone))
Run Code Online (Sandbox Code Playgroud)
怎么了
我的任务运行时间不一致,有时它们在预期时间运行,所以假设客户时区是 14:30,如果时区是America/Chicago20:30,那么这是我的预期行为。
有时,它在 14:30 运行,这正是 UTC 时间。
我正在跟踪任务在正确时间运行的当天和卡片在错误时间运行的当天是否存在某种模式。
附加信息
我已在 celery 4.4.2 和 5.xx 上尝试过此操作,但它仍然具有相同的行为。
这是我的芹菜配置。
CELERY_REDIS_SCHEDULER_URL = redis_instance_url
logger.debug("****** CELERY_REDIS_SCHEDULER_URL: ", CELERY_REDIS_SCHEDULER_URL)
logger.debug("****** environment: ", environment)
redbeat_redis_url = CELERY_REDIS_SCHEDULER_URL
broker_url = CELERY_REDIS_SCHEDULER_URL
result_backend = CELERY_REDIS_SCHEDULER_URL
task_serializer = 'pickle'
result_serializer = 'pickle'
accept_content = ['pickle']
enable_utc = False
task_track_started = True
task_send_sent_event = True
Run Code Online (Sandbox Code Playgroud)
您可以注意到enable_utc设置为False。
RedBeatScheduler中的调度程序来安排我的任务。如果有人遇到过这个问题或者可以帮助我重现它,我将非常感激。
其他编辑:
weekly_schedule : crontab(minute=30, hour=14, nowfun=self.now_function, day_of_week=1)
monthly_schedule : crontab(minute=30, hour=14, nowfun=self.now_function, day_of_month=1)
Run Code Online (Sandbox Code Playgroud)
示例项目
如果您想运行并重现该问题,这里有GitHub 上的示例项目。
RedBeat 的编码器和解码器不支持nowfun.
源代码:https://github.com/sibson/redbeat/blob/e6d72e2/redbeat/decoder.py#L94-L102
您看到的行为之前已经描述过:sibson/redbeat#192(评论 756397651)
您可以子类化并替换RedBeatJSONDecoder和RedBeatJSONEncoder。
由于nowfun必须是 JSON 可序列化,我们只能支持一些特殊情况,
例如nowfun=partial(datetime.now, tz=pytz.timezone(self.customer.c_timezone))
from datetime import datetime
from functools import partial
from celery.schedules import crontab
import pytz
from pytz.tzinfo import DstTzInfo
from redbeat.decoder import RedBeatJSONDecoder, RedBeatJSONEncoder
class CustomJSONDecoder(RedBeatJSONDecoder):
def dict_to_object(self, d):
if '__type__' not in d:
return d
objtype = d.pop('__type__')
if objtype == 'crontab':
if d.get('nowfun', {}).get('keywords', {}).get('zone'):
d['nowfun'] = partial(datetime.now, tz=pytz.timezone(d.pop('nowfun')['keywords']['zone']))
return crontab(**d)
d['__type__'] = objtype
return super().dict_to_object(d)
class CustomJSONEncoder(RedBeatJSONEncoder):
def default(self, obj):
if isinstance(obj, crontab):
d = super().default(obj)
if 'nowfun' not in d and isinstance(obj.nowfun, partial) and obj.nowfun.func == datetime.now:
zone = None
if obj.nowfun.args and isinstance(obj.nowfun.args[0], DstTzInfo):
zone = obj.nowfun.args[0].zone
elif isinstance(obj.nowfun.keywords.get('tz'), DstTzInfo):
zone = obj.nowfun.keywords['tz'].zone
if zone:
d['nowfun'] = {'keywords': {'zone': zone}}
return d
return super().default(obj)
Run Code Online (Sandbox Code Playgroud)
替换以下类redbeat.schedulers:
from redbeat import schedulers
schedulers.RedBeatJSONDecoder = CustomJSONDecoder
schedulers.RedBeatJSONEncoder = CustomJSONEncoder
Run Code Online (Sandbox Code Playgroud)