编写和运行任务 DAG 最简洁的方法是什么?

rkz*_*_io 5 python concurrency directed-acyclic-graphs

我想编写并运行一个有向无环图 (DAG),其中多个任务以串行或并行方式运行。理想情况下它看起来像:

def task1():
    # ...

def task2():
    # ...

graph = Sequence([
    task1,
    task2,
    Parallel([
        task3,
        task4
    ]),
    task5
]

graph.run()
Run Code Online (Sandbox Code Playgroud)

它将运行 1 -> 2 ->(同时运行 3 和 4)-> 5。任务需要访问全局范围来存储结果、写入日志和访问命令行参数。

我的用例是编写部署脚本。并行任务是 IO 密集型的:通常等待远程服务器完成步骤。

我研究了线程、asyncio、Airflow,但没有找到任何简单的库可以允许在没有一些样板代码的情况下遍历和控制图形的执行。有这样的东西存在吗?

Car*_*ate 2

这是一个快速的概念验证实施。它可以像这样使用:

graph = sequence(
            lambda: print(1),
            lambda: print(2),
            parallel(
                lambda: print(3),
                lambda: print(4),
                sequence(
                    lambda: print(5),
                    lambda: print(6))),
             lambda: print(7)

graph()

1
2
3
5
6
4
7
Run Code Online (Sandbox Code Playgroud)

sequence生成一个包装循环的函数for,并parallel生成一个包装线程池使用的函数:

from typing import Callable
from multiprocessing.pool import ThreadPool

Task = Callable[[], None]

_pool: ThreadPool = ThreadPool()

def sequence(*tasks: Task) -> Task:
    def run():
        for task in tasks:
            task()

    return run  # Returning "run" to be used as a task by other "sequence" and "parallel" calls

def parallel(*tasks: Task) -> Task:
    def run():
        _pool.map(lambda f: f(), tasks)  # Delegate to a pool used for IO tasks

    return run
Run Code Online (Sandbox Code Playgroud)

每次调用sequenceparallel返回一个新的“任务”(一个不带参数且不返回任何内容的函数)。然后,该任务可以由其他外部调用来调用sequenceparallel

需要注意的事项ThreadPool

  • 虽然这确实使用了线程池parallel,但由于 GIL,这仍然一次只会执行一件事。这parallel对于 CPU 密集型任务来说本质上是无用的。

  • 我没有指定池应该从多少个线程开始。我认为它默认为您可用的核心数量。如果您想要更多,您可以使用第一个参数指定要开始的数量ThreadPool

  • 为了简洁起见,我不会清理ThreadPool. 如果你使用这个,你绝对应该这样做。

  • 尽管ThreadPool是 的一部分multiprocessing,但令人困惑的是它使用线程而不是进程。