芹菜大块内链

use*_*563 10 python task celery

我想在celery chain命令中使用块.

chain = task1.s(arg1) | task2.chunks(?,CHUNK_SIZE) | task3.chunks(?, CHUNK_SIZE)
Run Code Online (Sandbox Code Playgroud)

基本上我想要做的是运行task1,将其结果块并发送到task2,然后task2应该调用task3,task3也应该从task2接收chunked结果来完成该过程.为什么?因为task1和task2都可以返回相当数量的项目,我想在更多批次中处理这些项目.

上面的代码不起作用,因为我不太确定要放置什么而不是问号以使其工作.

我不太确定这甚至是可能的,因为搜索没有提供太多结果,所以在这种情况下不可能构建这样的工作流程,我会对合理的替代方案感兴趣.

ari*_*eet 2

我不确定这对于现有的原语是否可行。

我可以考虑是否有两种替代方案/解决方法:

  1. 使用块/和弦从任务中启动新任务。

    你一定已经想到了这一点。这个想法是正常调用task1 apply_async。一旦该任务完成生成需要分块的大量输出,只需使用原语进一步为任务 2 创建块。同样,对task2和task3之间的转换执行相同的步骤。当您最终等待获取内部任务的结果时,从任务内部调用任务只是一个坏主意。因此请记住,如果您正在等待任务结果,那么这不是推荐的方法。

    @task
    def task1(some_input):
        # Do stuff
        # Create a list of lists where the inner list represent the *args to send to an individual task
        task2.chunks([[i, j], [i, j], [i, j]], CHUNK_SIZE).apply_async()
    
    @task
    def task2(a, b):
        # Do stuff
        # Create a list of lists where the inner list represent the *args to send to an individual task
        task3.chunks([[i, j], [i, j], [i, j]], CHUNK_SIZE).apply_async()
    
    @task
    def task3(a, b):
        # Do stuff
    
    Run Code Online (Sandbox Code Playgroud)
  2. 这个解决方案有点有趣。我在 celery Github 问题页面上遇到了一个特定的请求。查看来自 steeve 的拉取请求:https://github.com/celery/celery/pull/817 根据我的理解,他创建了一个动态任务装饰器(关于名称是否应该如此存在争论),其中了解任务是否返回子任务。如果是这样,它首先应用该子任务。他声称他在 Veezio 的生产中成功地使用了它。我自己还没有尝试过。我建议转到该线程并问几个问题。或者甚至通过 Twitter 或 IRC 之类的方式骚扰 Steve。