芹菜任务链和访问**kwargs

Ben*_*ite 12 python kwargs celery chain

我有类似于此处概述的情况,除了不是使用多个参数链接任务,我想链接返回具有多个条目的字典的任务.

这是 - 非常松散和抽象 - 我正在努力做的事情:

tasks.py

@task()
def task1(item1=None, item2=None):
  item3 = #do some stuff with item1 and item2 to yield item3
  return_object = dict(item1=item1, item2=item2, item3=item3)
  return return_object

def task2(item1=None, item2=None, item3=None):
  item4 = #do something with item1, item2, item3 to yield item4
  return_object = dict(item1=item1, item2=item2, item3=item3, item4=item4)
  return return_object
Run Code Online (Sandbox Code Playgroud)

从ipython开始,我可以单独和异步地调用task1,没有任何问题.

我也可以单独调用task2,task1返回的结果为双星参数:

>>res1 = task1.s(item1=something, item2=something_else).apply_async()
>>res1.status
'SUCCESS'
>>res2 = task2.s(**res1.result).apply_async()
>>res2.status
'SUCCESS
Run Code Online (Sandbox Code Playgroud)

但是,我最终想要实现的是与上面相同的最终结果,但是通过链,在这里,我无法弄清楚如何将task2实例化而不是使用task1返回的(位置)参数,而是使用task1.result作为**kwargs:

chain_result = (task1.s(item1=something, item2=something_else) | task2.s()).apply_async()  #THIS DOESN'T WORK!
Run Code Online (Sandbox Code Playgroud)

我怀疑我可以返回并重写我的任务,以便它们返回位置参数而不是字典,这可能会清除,但在我看来应该有一些方法可以在task2中使用等效的方法访问task1的返回对象**双星的功能.我也怀疑我在这里遗漏了一些关于Celery子任务实现或*args与**kwargs相当明显的东西.

希望这是有道理的.并提前感谢任何提示.

Bal*_*rol 8

这是我对问题的看法,使用抽象的任务类:

from __future__ import absolute_import
from celery import Task
from myapp.tasks.celery import app   


class ChainedTask(Task):
    abstract = True    

    def __call__(self, *args, **kwargs):
        if len(args) == 1 and isinstance(args[0], dict):
            kwargs.update(args[0])
            args = ()
        return super(ChainedTask, self).__call__(*args, **kwargs)

@app.task(base=ChainedTask)
def task1(x, y):
    return {'x': x * 2, 'y': y * 2, 'z': x * y}    


@app.task(base=ChainedTask)
def task2(x, y, z):
    return {'x': x * 3, 'y': y * 3, 'z': z * 2}
Run Code Online (Sandbox Code Playgroud)

您现在可以定义和执行您的链:

from celery import chain

pipe = chain(task1.s(x=1, y=2) | task2.s())
pipe.apply_async()
Run Code Online (Sandbox Code Playgroud)


ask*_*sol 2

chain其他画布基元属于功能实用程序系列,例如mapreduce

例如,其中map(target, items)调用target(item)列表中的每个项目,Python 有一个很少使用的映射版本itertools.starmap,称为 ,而是调用target(*item)

虽然我们可以添加starchain甚至kwstarchain添加到工具箱中,但这些工具非常专业,可能不会经常使用。

有趣的是,Python 通过列表和生成器表达式使这些变得不必要,因此 map 被替换为[target(item) for item in item],starmap 被替换为[target(*item) for item in item]

因此,我认为我们不应该为每个原语实现多种替代方案,而应该专注于寻找一种更灵活的方式来支持这一点,例如使用芹菜驱动的生成器表达式(如果可能,并且如果不是类似强大的东西)