是否可以将自定义路线用于芹菜的画布基元?

CJ *_*net 3 celery django-celery

我有不同的Rabbit队列,每个队列专门用于一种特殊的订单处理:

# tasks.py

@celery.task
def process_order_for_product_x(order_id):
    pass  # elided ...


@celery.task
def process_order_for_product_y(order_id):
    pass  # elided ...


# settings.py

CELERY_QUEUES = {
    "black_hole": {
        "binding_key": "black_hole",
        "queue_arguments": {"x-ha-policy": "all"}
    },
    "product_x": {
        "binding_key": "product_x",
        "queue_arguments": {"x-ha-policy": "all"}
    },
    "product_y": {
        "binding_key": "product_y",
        "queue_arguments": {"x-ha-policy": "all"}
    },
Run Code Online (Sandbox Code Playgroud)

我们的政策是通过设置来强制执行显式路由CELERY_DEFAULT_QUEUE = 'black_hole',然后再从中使用black_hole

这些任务中的每一个都可以使用芹菜的画布基元,如下所示:

# tasks.py

@celery.task
def process_order_for_product_x(order_id):
    # These can run in parallel
    stage_1_group = group(do_something.si(order_id),
                          do_something_else.si(order_id))

    # These can run in parallel
    another_group = group(do_something_at_end.si(order_id),
                          do_something_else_at_end.si(order_id))

    # These run in a linear sequence
    process_task = chain(
        stage_1_group,
        do_something_dependent_on_stage_1.si(order_id),
        another_group)

    process_task.apply_async()
Run Code Online (Sandbox Code Playgroud)

假如我想的具体用途celery.groupcelery.chordcelery.chord_unlock,等帆布任务,通过队列流入其相应的产品,而不是被困在一black_hole,有没有办法来调用与任何自定义任务名称或自定义routing_key每个特定的画布的任务吗?

出于celery.*某种原因,我不愿讨论所有任务,而不希望将它们发送到所有celery_canvas队列,而这正是我在此期间所做的。

小智 5

此方法允许您将Celery画布任务路由到回调任务的队列。

这是可能的,描述指定基于类的自定义任务路由器芹菜这里

让我们专注于celery.chord_unlock任务。它的签名在这里定义。

def unlock_chord(self, group_id, callback, ...):
Run Code Online (Sandbox Code Playgroud)

第二个位置参数是和弦回调任务的签名。

Celery中的任务签名基本上是字典,因此使我们有机会访问任务选项,包括任务队列名称。

这是一个例子:

class CeleryRouter(object):
    def route_for_task(self, task, args=None, kwargs=None):
        if task == 'celery.chord_unlock':
            callback_signature = args[1]
            options = callback_signature.get('options')
            if options:
                queue = options.get('queue')
                if queue:
                    return {'queue': queue}
Run Code Online (Sandbox Code Playgroud)

将其添加到Celery配置中:

CELERY_ROUTES = (CeleryRouter(), 
Run Code Online (Sandbox Code Playgroud)