Python Celery 与 now 函数进行任务调度的 cronjob 计时不一致

Esp*_*azi 7 python cron scheduling celery celerybeat

情况

我有一个芹菜任务,我在不同的时区为每个客户运行。

基本上,对于数据库中的每个客户,我都会获取时区,然后以这种方式设置 celery 任务。

'schedule': crontab(minute=30, hour=14, nowfun=self.now_function)
Run Code Online (Sandbox Code Playgroud)

基本上,我想要的是在客户时区 14:30 运行的任务。因此就有了 now_function。

我的now_function只是获取客户时区的当前时间。

def now_function(self):
    """
    return the now function for the task
    this is used to compute the time a task should be scheduled for a given customer
    """
    return datetime.now(timezone(self.customer.c_timezone))
Run Code Online (Sandbox Code Playgroud)

怎么了

我的任务运行时间不一致,有时它们在预期时间运行,所以假设客户时区是 14:30,如果时区是America/Chicago20:30,那么这是我的预期行为。

有时,它在 14:30 运行,这正是 UTC 时间。

我正在跟踪任务在正确时间运行的当天和卡片在错误时间运行的当天是否存在某种模式。

附加信息

我已在 celery 4.4.2 和 5.xx 上尝试过此操作,但它仍然具有相同的行为。

这是我的芹菜配置。

CELERY_REDIS_SCHEDULER_URL = redis_instance_url
logger.debug("****** CELERY_REDIS_SCHEDULER_URL: ", CELERY_REDIS_SCHEDULER_URL)
logger.debug("****** environment: ", environment)
redbeat_redis_url = CELERY_REDIS_SCHEDULER_URL
broker_url = CELERY_REDIS_SCHEDULER_URL
result_backend = CELERY_REDIS_SCHEDULER_URL
task_serializer = 'pickle'
result_serializer = 'pickle'
accept_content = ['pickle']
enable_utc = False
task_track_started = True
task_send_sent_event = True
Run Code Online (Sandbox Code Playgroud)

您可以注意到enable_utc设置为False

  • 我正在使用 AWS 的 Redis 实例来运行我的任务。
  • 我正在使用此包RedBeatScheduler中的调度程序来安排我的任务。

如果有人遇到过这个问题或者可以帮助我重现它,我将非常感激。

其他编辑:

  • 我有另一个 cron 同时执行相同的工作,但每周和每月运行,但它们工作得很好。
weekly_schedule : crontab(minute=30, hour=14, nowfun=self.now_function, day_of_week=1)
monthly_schedule : crontab(minute=30, hour=14, nowfun=self.now_function, day_of_month=1)
Run Code Online (Sandbox Code Playgroud)

示例项目

如果您想运行并重现该问题,这里有GitHub 上的示例项目。

aar*_*ron 3

RedBeat 的编码器和解码器不支持nowfun.
源代码:https://github.com/sibson/redbeat/blob/e6d72e2/redbeat/decoder.py#L94-L102
您看到的行为之前已经描述过:sibson/redbeat#192(评论 756397651)

您可以子类化并替换RedBeatJSONDecoderRedBeatJSONEncoder

由于nowfun必须是 JSON 可序列化,我们只能支持一些特殊情况,
例如nowfun=partial(datetime.now, tz=pytz.timezone(self.customer.c_timezone))

from datetime import datetime
from functools import partial

from celery.schedules import crontab
import pytz
from pytz.tzinfo import DstTzInfo
from redbeat.decoder import RedBeatJSONDecoder, RedBeatJSONEncoder


class CustomJSONDecoder(RedBeatJSONDecoder):
    def dict_to_object(self, d):
        if '__type__' not in d:
            return d

        objtype = d.pop('__type__')

        if objtype == 'crontab':
            if d.get('nowfun', {}).get('keywords', {}).get('zone'):
                d['nowfun'] = partial(datetime.now, tz=pytz.timezone(d.pop('nowfun')['keywords']['zone']))
            return crontab(**d)

        d['__type__'] = objtype

        return super().dict_to_object(d)


class CustomJSONEncoder(RedBeatJSONEncoder):
    def default(self, obj):
        if isinstance(obj, crontab):
            d = super().default(obj)
            if 'nowfun' not in d and isinstance(obj.nowfun, partial) and obj.nowfun.func == datetime.now:
                zone = None
                if obj.nowfun.args and isinstance(obj.nowfun.args[0], DstTzInfo):
                    zone = obj.nowfun.args[0].zone
                elif isinstance(obj.nowfun.keywords.get('tz'), DstTzInfo):
                    zone = obj.nowfun.keywords['tz'].zone
                if zone:
                    d['nowfun'] = {'keywords': {'zone': zone}}
            return d

        return super().default(obj)
Run Code Online (Sandbox Code Playgroud)

替换以下类redbeat.schedulers

from redbeat import schedulers

schedulers.RedBeatJSONDecoder = CustomJSONDecoder
schedulers.RedBeatJSONEncoder = CustomJSONEncoder
Run Code Online (Sandbox Code Playgroud)