如何将一系列任务路由到芹菜中的特定队列?

mpa*_*paf 12 python rabbitmq celery chain

当我将任务路由到特定队列时,它可以工作:

task.apply_async(queue='beetroot')
Run Code Online (Sandbox Code Playgroud)

但如果我创建一个链:

chain = task | task
Run Code Online (Sandbox Code Playgroud)

然后我写

chain.apply_async(queue='beetroot')
Run Code Online (Sandbox Code Playgroud)

它似乎忽略了queue关键字并分配给默认的'celery'队列.

如果芹菜支持链中的路由会很好 - 所有任务在同一队列中按顺序执行.

小智 15

我是这样做的:

subtask = task.s(*myargs, **mykwargs).set(queue=myqueue)
mychain = celery.chain(subtask, subtask2, ...)
mychain.apply_async()
Run Code Online (Sandbox Code Playgroud)

  • 同一链中的不同子任务可以分配不同的队列吗? (2认同)

mpa*_*paf 10

好的,我弄清楚了这一点.

您必须将所需的执行选项(如queue =或countdown =)添加到子任务定义中,或者通过部分:

子任务定义:

from celery import subtask

chain = subtask('task', queue = 'beetroot') | subtask('task', queue = 'beetroot')
Run Code Online (Sandbox Code Playgroud)

部分:

chain = task.s().apply_async(queue = 'beetroot') | task.s().apply_async(queue = 'beetroot')
Run Code Online (Sandbox Code Playgroud)

然后通过以下方式执行链:

chain.apply_async()
Run Code Online (Sandbox Code Playgroud)

要么,

chain.delay()
Run Code Online (Sandbox Code Playgroud)

并且任务将被发送到"甜菜根"队列.最后一个命令中的额外执行参数不会执行任何操作.将所有这些执行参数应用于Chain(或Group,或任何其他Canvas原语)级别会更好.

  • 嗯,那部分例子对我不起作用,我得到了以下错误:TypeError:不支持的操作数类型为|:'AsyncResult'和'AsyncResult'(使用3.0.23) (2认同)

Pri*_*shC 6

这很晚了,但是我不认为@mpaf提供的代码是完全正确的。

上下文:就我而言,我有两个子任务,第一个子任务提供一个返回值,该返回值作为输入参数传递给第二个任务。我在执行第二个任务时遇到了麻烦-我在日志中看到Celery将第二个任务确认为第一个任务的回调,但它永远不会执行第二个任务。

这是我的非工作链代码-:

from celery import chain

chain(
    module.task1.s(arg),
    module.task2.s()
).apply_async(countdown=0.1, queue='queuename')
Run Code Online (Sandbox Code Playgroud)

使用@mpaf答案中提供的语法,我让两个任务都可以执行,但是执行顺序很随意,第二个子任务未被确认为第一个子任务的回调。我想到了浏览文档的方式,以了解如何在子任务上显式设置队列。

这是工作代码-

chain(
    module.task1.s(arg).set(queue='queuename'),
    module.task2.s().set(queue='queuename')
).apply_async(countdown=0.1)
Run Code Online (Sandbox Code Playgroud)