如何在运行任务之前设置Celery来调用自定义初始化函数?

xel*_*elk 18 python django daemon amqp celery

我有一个Django项目,我正在尝试使用Celery提交后台处理任务(http://ask.github.com/celery/introduction.html).Celery与Django很好地集成,我已经能够提交我的自定义任务并获得结果.

唯一的问题是我找不到在守护进程中执行自定义初始化的合理方法.在开始处理任务之前,我需要调用一个加载大量内存的昂贵函数,而且每次都无法调用该函数.

以前有人有这个问题吗?任何想法如何解决它而不修改Celery源代码?

谢谢

ask*_*sol 16

您可以编写自定义加载程序,也可以使用信号.

加载器具有该on_task_init方法,该任务即将执行时on_worker_init调用,并由celery + celerybeat主进程调用.

使用信号可能是最简单的,可用的信号是:

0.8.4:

  • task_prerun(task_id, task, args, kwargs)

    在任务即将由worker执行时调度(或在本地使用apply/或if CELERY_ALWAYS_EAGER已设置)时调度.

  • task_postrun(task_id, task, args, kwargs, retval) 在与上述相同的条件下执行任务后调度.

  • task_sent(task_id, task, args, kwargs, eta, taskset)

    应用任务时调用(不适合长时间运行)

0.9.x中可用的附加信号(github上的当前主分支):

  • worker_init()

    在celeryd启动时调用(在任务初始化之前调用,因此如果在系统支持上fork,任何内存更改都将复制到子工作进程).

  • worker_ready()

    当celeryd能够接收任务时调用.

  • worker_shutdown()

    当芹菜关闭时调用.

这是一个在计算过程中第一次运行任务时预先计算内容的示例:

from celery.task import Task
from celery.registry import tasks
from celery.signals import task_prerun

_precalc_table = {}

class PowersOfTwo(Task):

    def run(self, x):
        if x in _precalc_table:
            return _precalc_table[x]
        else:
            return x ** 2
tasks.register(PowersOfTwo)


def _precalc_numbers(**kwargs):
    if not _precalc_table: # it's empty, so haven't been generated yet
        for i in range(1024):
            _precalc_table[i] = i ** 2


# need to use registered instance for sender argument.
task_prerun.connect(_precalc_numbers, sender=tasks[PowerOfTwo.name])
Run Code Online (Sandbox Code Playgroud)

如果您希望为所有任务运行该函数,只需跳过该sender参数即可.