安排重复活动的提醒

len*_*nin 6 python scheduled-tasks celery recurring-events

我正在使用一个允许用户在日历上创建事件(一次性或重复性)的Web应用程序,并且在事件启动前不久,系统将通知其参与者.我在为这种通知设计流程时遇到了麻烦,特别是对于重复发生的事件.

需要考虑的事项:

  1. Web应用程序的体系结构使得有许多相同结构的数据库,每个数据库都有自己的用户和事件集.因此,针对一个数据库的任何查询都需要针对几千个其他人进行.
  2. 定期事件可能已排除日期(类似于RRULE和EXDATE组合).

  3. 用户可以更新事件的时间/重复规则.

  4. 该应用程序是用Python编写的,并且已经使用带有Redis代理的Celery 3.1.解决方案使用此设置会很好,但任何事情都可以.根据我的发现,目前很难用Celery动态添加周期性任务.

我正在尝试的解决方案:

  • 定期任务每天运行一次,扫描每个数据库并添加任务以在适当的时间为每天发生重复的事件发出通知.

  • 如上生成的每个任务都将其ID临时保存在Redis中.如果用户在安排通知任务后更改当天的事件时间,则该任务将被撤销并替换为新任务.

上述解决方案的示例代码:

  • tasks.py,所有要运行的任务:

    from celery.task import task as celery_task
    from celery.result import AsyncResult
    from datetime import datetime
    
    # ...
    
    @celery_task
    def create_notify_task():
        for account in system.query(Account):
            db_session = account.get_session()    # get sql alchemy session
            for event in db_session.query(Event):
                schedule_notify_event(account, partial_event)
    
    
    @celery_task(name='notify_event_users')
    def notify_event_users(account_id, event_id):
        # do notification for every event participant
        pass
    
    def schedule_notify_event(account, event):
        partial_event = event.get_partial_on(datetime.today())
        if partial_event:
            result = notify_event_users.apply_async(
                    args = (account.id, event.id),
                    eta = partial_event.start)
            replace_task_id(account.id, event.id, result.id)
        else:
            replace_task_id(account.id, event.id, None)
    
    def replace_task_id(account_id, event_id, result_id):
        key = '{}:event'.format(account_id)
        client = redis.get_client()
        old_result_id = client.hget(key, event_id)
        if old_result_id:
            AsyncResult(old_result_id).revoke()
        client.hset(key, event_id, result_id)
    
    Run Code Online (Sandbox Code Playgroud)
  • event.py:

    # when a user change event's time
    def update_event(event, data):
        # ...
        # update event
        # ...
        schedule_notify_event(account, event)
    
    Run Code Online (Sandbox Code Playgroud)
  • 芹菜安装文件:

    from celery.schedules import crontab
    
    CELERYBEAT_SCHEDULE = {
        'create-notify-every-day': {
            'task': 'tasks.create_notify_task',
            'schedule': crontab(minute=0, hour=0),
            'args': (,)
        },
    }
    
    Run Code Online (Sandbox Code Playgroud)

以上的一些缺点是:

  • 日常任务可能需要很长时间才能运行.最后处理的数据库中的事件必须等待,可能会被遗漏.提前调度该任务(例如,第二天前2小时)可以缓解这一问题,但是首次运行设置(或服务器重启后)有点尴尬.

  • 必须小心,以便通知任务不会为同一事件安排两次(例如,因为create_notify_task每天运行多次......).

对此有更合理的方法吗?

相关问题:

len*_*nin 5

已经很久没有答案了,我都忘了这个问题了。无论如何,当时我采用了以下解决方案。我在这里概述一下,以防有人感兴趣。

  • 当创建事件时,任务被安排在其下一次发生之前不久(即下一次通知时间)运行。计划时间是根据应用的所有重复规则和例外规则计算的,因此这只是 celery 的一个简单的计划一次性任务。
  • 当任务运行时,它会执行通知作业,并在下一个通知时间安排新任务(同样,考虑所有重复规则和异常规则)。如果没有下一个事件发生,则不会安排新任务。
  • 任务的 ID 与事件一起保存在数据库中。如果事件的时间发生更改,则任务将被取消,并会在新的下一个通知时间安排新任务。当任务运行并安排新任务时,新任务的 ID 会保存在数据库中。

我能想到的一些优点和缺点:

  • 优点:
    • 芹菜中不需要复杂的重复规则,因为任务只是为单次运行安排的。
    • 每个任务都相当小且快速,因为它只需要关心单个事件通知。
  • 缺点:
    • 任何时候,都有大量的celery定时任务等待执行,大概有几十万个量级。我不确定这如何影响芹菜的性能,所以它可能是也可能不是一个真正的骗局。到目前为止,系统似乎运行良好。