如何动态添加/删除周期性任务到芹菜(celerybeat)

Jam*_*est 43 python celery celerybeat

如果我有一个定义如下的函数:

def add(x,y):
  return x+y
Run Code Online (Sandbox Code Playgroud)

有没有办法动态添加这个函数作为芹菜PeriodicTask并在运行时启动它?我希望能够做类似(伪代码)的事情:

some_unique_task_id = celery.beat.schedule_task(add, run_every=crontab(minute="*/30"))
celery.beat.start(some_unique_task_id)
Run Code Online (Sandbox Code Playgroud)

我还想用类似(伪代码)的东西动态地停止或删除该任务:

celery.beat.remove_task(some_unique_task_id)
Run Code Online (Sandbox Code Playgroud)

要么

celery.beat.stop(some_unique_task_id)
Run Code Online (Sandbox Code Playgroud)

仅供参考我没有使用djcelery,它允许您通过django管理员管理定期任务.

McP*_*McP 38

这个问题在google网上得到了解答.

我不是作者,所有功劳归功于Jean Mark

这是一个适当的解决方案.确认工作,在我的方案,我子类周期性任务,并创建了一个模型出来的,因为我可以添加其他领域的模型,因为我需要还,所以我可以添加"终止"的方法.您必须将定期任务的enabled属性设置为False并在删除之前保存它.整个子类不是必须的,schedule_every方法是真正完成工作的方法.当你准备终止您的任务(如果你没有继承它),你可以只使用PeriodicTask.objects.filter(名称= ...)来搜索你的任务,禁用它,然后将其删除.

希望这可以帮助!

from djcelery.models import PeriodicTask, IntervalSchedule
from datetime import datetime

class TaskScheduler(models.Model):

    periodic_task = models.ForeignKey(PeriodicTask)

    @staticmethod
    def schedule_every(task_name, period, every, args=None, kwargs=None):
    """ schedules a task by name every "every" "period". So an example call would be:
         TaskScheduler('mycustomtask', 'seconds', 30, [1,2,3]) 
         that would schedule your custom task to run every 30 seconds with the arguments 1,2 and 3 passed to the actual task. 
    """
        permissible_periods = ['days', 'hours', 'minutes', 'seconds']
        if period not in permissible_periods:
            raise Exception('Invalid period specified')
        # create the periodic task and the interval
        ptask_name = "%s_%s" % (task_name, datetime.datetime.now()) # create some name for the period task
        interval_schedules = IntervalSchedule.objects.filter(period=period, every=every)
        if interval_schedules: # just check if interval schedules exist like that already and reuse em
            interval_schedule = interval_schedules[0]
        else: # create a brand new interval schedule
            interval_schedule = IntervalSchedule()
            interval_schedule.every = every # should check to make sure this is a positive int
            interval_schedule.period = period 
            interval_schedule.save()
        ptask = PeriodicTask(name=ptask_name, task=task_name, interval=interval_schedule)
        if args:
            ptask.args = args
        if kwargs:
            ptask.kwargs = kwargs
        ptask.save()
        return TaskScheduler.objects.create(periodic_task=ptask)

    def stop(self):
        """pauses the task"""
        ptask = self.periodic_task
        ptask.enabled = False
        ptask.save()

    def start(self):
        """starts the task"""
        ptask = self.periodic_task
        ptask.enabled = True
        ptask.save()

    def terminate(self):
        self.stop()
        ptask = self.periodic_task
        self.delete()
        ptask.delete()
Run Code Online (Sandbox Code Playgroud)


ask*_*sol 19

不,我很抱歉,这对于常规的芹菜不可能.

但是它可以轻松扩展以执行您想要的操作,例如,django-celery调度程序只是一个子类读取和编写计划到数据库(顶部有一些优化).

即使对于非Django项目,您也可以使用django-celery调度程序.

像这样的东西:

还有一个djcelerymon命令可用于非Django项目在同一进程中启动celerycam和Django Admin webserver,您可以使用它在一个漂亮的Web界面中编辑您的周期性任务:

   $ djcelerymon
Run Code Online (Sandbox Code Playgroud)

(注意由于某种原因djcelerymon无法使用Ctrl + C停止,你必须使用Ctrl + Z + kill%1)

  • 2012年至2016年的任何变化? (8认同)

小智 5

有一个名为django-celery-beat的库,它提供了所需的模型。为了使其能够动态加载新的定期任务,必须创建自己的Scheduler。

from django_celery_beat.schedulers import DatabaseScheduler


class AutoUpdateScheduler(DatabaseScheduler):

    def tick(self, *args, **kwargs):
        if self.schedule_changed():
            print('resetting heap')
            self.sync()
            self._heap = None
            new_schedule = self.all_as_schedule()

            if new_schedule:
                to_add = new_schedule.keys() - self.schedule.keys()
                to_remove = self.schedule.keys() - new_schedule.keys()
                for key in to_add:
                    self.schedule[key] = new_schedule[key]
                for key in to_remove:
                    del self.schedule[key]

        super(AutoUpdateScheduler, self).tick(*args, **kwargs)

    @property
    def schedule(self):
        if not self._initial_read and not self._schedule:
            self._initial_read = True
            self._schedule = self.all_as_schedule()

        return self._schedule
Run Code Online (Sandbox Code Playgroud)


Tri*_*own 5

celery v4.1.0中包含的修复程序终于使这成为可能。现在,您只需要更改数据库后端中的日程表条目,celery-beat将根据新的日程表进行操作。

文档模糊地描述了它是如何工作的。celery-beat的默认调度程序PersistentScheduler使用一个搁置文件作为其调度数据库。实例中对beat_schedule字典的任何更改都将PersistentScheduler与此数据库同步(默认情况下,每3分钟同步一次),反之亦然。该文档描述了如何将新的条目添加beat_schedule使用app.add_periodic_task。要修改现有条目,只需添加具有相同条目的新条目name。就像从字典中删除条目一样:del app.conf.beat_schedule['name']

假设您想使用外部应用程序监视和修改您的芹菜节拍时间表。然后,您有几种选择:

  1. 您可以open搁置数据库文件并像字典一样读取其内容。写回该文件进行修改。
  2. 您可以运行Celery应用程序的另一个实例,然后使用该实例来修改搁置文件,如上所述。
  3. 您可以使用django-celery-beat中的自定义调度程序类将日程表存储在django管理的数据库中,并在其中访问条目。
  4. 您可以使用celerybeat-mongo中的调度程序将调度存储在MongoDB后端中,并在其中访问条目。

  • 例如,当我尝试以下操作时:“_gdbm.error:[Errno 11]资源暂时不可用”。所以看起来当 celery 运行时我似乎无法通过 `shelve.open(file)` 打开文件。 (3认同)