在运行之前绘制 Celery 画布

laz*_*zy1 5 python graph graphviz celery

有文档介绍如何在 Celery 中运行画布作业后生成图表。不过,我想在运行作业之前生成一个图表。

假设我创建了一个简单的链:

c = chain(add.s(1, 2), mul(4))
Run Code Online (Sandbox Code Playgroud)

如何生成链图?

谢谢,

三木

Ben*_*ier 4

我也有同样的愿望。在运行作业之前生成图表。所以我做了一些工作:)

看来芹菜不允许。其原因(至少我在尝试这样做时理解)是在图中每个节点都必须有一个唯一的名称。一旦画布被执行,这个唯一的名称就是 celery task_id,但在执行之前没有任何东西允许这样的区别。

所以解决方案是自己生成这个图,当然唯一地标识每个节点(为此计数器可以完成这项工作)。

这是这个函数的工作:

# -*- coding: utf-8 -*-
from celery.canvas import chain, group, Signature


def analyze_canvas(canvas):
    return _analyze_canvas(canvas)['dependencies']


def _analyze_canvas(canvas, previous=[], i=0):
    dependencies = []

    if isinstance(canvas, chain):
        for t in canvas.tasks:
            if not (isinstance(t, group) or isinstance(t, chain)):
                n = str(t) + " - (" + str(i) + ")"
                i += 1
                dependencies.append((n, previous))
                previous = [n]
            else:
                analysis = _analyze_canvas(t, previous, i)
                dependencies.extend(analysis['dependencies'])
                previous = analysis['previous']
    elif isinstance(canvas, group):
        new_previous = []
        for t in canvas.tasks:
            if not (isinstance(t, group) or isinstance(t, chain)):
                n = str(t) + " - (" + str(i) + ")"
                i += 1
                dependencies.append((n, previous))
                new_previous.append(n)
            else:
                analysis = _analyze_canvas(t, previous, i)
                dependencies.extend(analysis['dependencies'])
                new_previous = analysis['previous']
        previous = new_previous
    elif isinstance(canvas, Signature):
        n = str(t) + " - (" + str(i) + ")"
        i += 1
        dependencies.append((n, previous))
        previous = [n]
    return {"dependencies": dependencies,
            "previous": previous}
Run Code Online (Sandbox Code Playgroud)

它生成画布的依赖图。这个想法只是迭代画布的其他任务并识别组/链/签名以生成正确的依赖关系。

从此时起,您可以使用更多 celery 实用程序来生成点文件。这是一个小用法示例:

from celery_util import analyze_canvas
from celery.datastructures import DependencyGraph
from celery import Celery, group

app = Celery()

@app.task
def t1():
    pass

@app.task
def t2():
    pass

canvas = t1.si() | t2.si() | group(t1.si(), t1.si(), t2.si()) | t2.si()

d = analyze_canvas(canvas)
dg = DependencyGraph(it=d)
pipo = open("pipo.dot", "w+")
dg.to_dot(pipo)
Run Code Online (Sandbox Code Playgroud)

在这个例子中,我只是声明虚拟任务并将它们链接/分组在一个漂亮的画布中。我使用 celery utilDependencyGraph来获得对象表示以及将图形转储为点的能力,这是我使用to_dot方法所做的。

美丽的结果是: 任务图

  • 在 Celery 4 中,“signature.freeze()”可用于在执行之前获取“AsyncResult”。 (2认同)