Celery 动态工作流

Ahm*_*DAL 5 python celery

我正在使用Celery版本3.1.17

通常情况下,你可以用像帆布模块,芹菜准备自己的静态工作流程chaingroupchord或者干脆linking tasks。您可以访问任何结果或任何任务属性,例如工作流中任何任务的任务 ID。您必须预先定义您的任务。

我通过在我的工作流中调用子任务来执行动态子任务。例如,我调用一个任务可能是一个画布模块,他们动态地决定逻辑并尝试根据该决定调用子任务。但是在该解决方案中,我的静态工作流任务和动态子任务之间没有父/子关系。我无法追踪他们。这真的很令人沮丧。这是我目前无法使用的方式;

class ParentTask(Task):
    def run(self, *args, **kwargs):
        SubTask().subtask(args=(1, 2), countdown=1).apply_async()

class SubTask(Task):
    def run(self, x, y, *args, **kwargs):
        return x+y

non_tracable_for_subtask_result = ParentTask().delay()
Run Code Online (Sandbox Code Playgroud)

我需要一个可以在我的工作流程中的任务中动态扩展的画布模块(组、和弦等)。我可以chord,group, etc.在运行时将新的子任务动态链接到我当前的工作流 ( ) 吗?我想要类似的东西;

// THIS CODE DOES NOT WORK, JUST TO EXPLAIN REQUIREMENT
class ParentTask(Task):
    def run(self, *args, **kwargs):
        count = get_count()
        sub_task=SubTask().subtask(args=(1, 2), countdown=1)
        for i in range(count):
            //It could be like. THIS PART WHAT I LOOK FOR
            self.link(sub_task)

class SubTask(Task):
    def run(self, x, y, *args, **kwargs):
        return x+y

>>> tracable_for_subtask_result = ParentTask().delay()
>>> tracable_for_subtask_result.children.get()
3
>>> tracable_for_subtask_result.children.id
.....
Run Code Online (Sandbox Code Playgroud)

Kir*_*hxh 0

您可以使用 来表达动态工作流程task.replace。这里有一些例子:https ://github.com/celery/celery/issues/3437

你可以这样写:

class ParentTask(Task):
def run(self, *args, **kwargs):
    count = get_count()
    sub_tasks=[subtask.s(i) for i in range(count)]
    g = group(*sub_tasks) | after_sub_tasks.s(self.request.id).on_error(after_sub_tasks_error.s(self.request.id))
    return self.replace(g) # current task is replaced with subtasks without blocking current worker

@app.task(bind=True, name='after_sub_task')
def after_sub_tasks.(self, results, main_task_id):
    print(f"results {results}")
    print(f"main_task_id {main_task_id}")
    return True

@app.task(bind=True, name='nx.delayed_task_error')
def after_sub_tasks_error(self, failed_task_id, main_task_id):
    # no results available here... sigh
    print(f"main_task_id {main_task_id}")
    raise ValueError("Part of subtasks failed")
Run Code Online (Sandbox Code Playgroud)