从应在 Airflow 中按顺序运行的函数返回任务列表

Fra*_*ans 2 airflow google-cloud-composer

我想从一个函数返回 2 个或更多任务,这些任务应该在它们插入依赖项的位置按顺序运行,请参见下文。

t1 = PythonOperator()

def generate_tasks():
    t2 = PythonOperator()
    t3 = PythonOperator()
    return magic(t2, t3) # magic needed here (preferably)

t1 >> generate_tasks() # otherwise here
# desired result: t1 >> t2 >> t3
Run Code Online (Sandbox Code Playgroud)

这可行吗?据我了解,Airflow 2.0 似乎通过 TaskGroup 实现了这一点,但我们使用的是 Google 的 Composer,2.0 暂时不会可用。

我发现的最佳解决方法:

t1 = PythonOperator()

def generate_tasks():
    t2 = PythonOperator()
    t3 = PythonOperator()
    return [t2, t3]

tasks = generate_tasks()
t1 >> tasks[0] >> tasks[1]
Run Code Online (Sandbox Code Playgroud)

但我真的希望将其抽象化,因为它或多或少违背了从单个函数返回多个运算符的目的。我们希望它是最终用户所知的单个单元,即使它可以由 2 个或更多任务组成。

如何使用 Airflow 2.0 中的 TaskGroup 来完成此操作:

class Encryptor:
    def encrypt_and_archive(self):
        with TaskGroup("archive_and_encrypt") as section_1:
            encrypt = DummyOperator(task_id="encrypt")
            archive = BashOperator(task_id="archive", bash_command='echo 1')
            encrypt >> archive
        return section_1

with DAG(dag_id="example_return_task_group", start_date=days_ago(2), tags=["example"]) as dag:
    start = DummyOperator(task_id="start")
    encrypt_and_archive = Encryptor().encrypt_and_archive()
    end = DummyOperator(task_id='end')

             #  single variable, containing two tasks
    start >> encrypt_and_archive >> end
Run Code Online (Sandbox Code Playgroud)

这将创建以下图表:

图表中的任务组

2.0之前有类似的远程操作吗?

Ela*_*lad 5

你没有解释什么 magic(t2, t3)是。TaskGroup 严格来说是 UI 功能,它不会影响 DAG 逻辑。根据您的描述,您似乎正在寻找特定的逻辑(否则是什么magic?)。

我相信这就是您所追求的:

default_args = {
    'owner': 'airflow',
    'start_date': datetime(2021, 1, 24),
}
def generate_tasks():
    operator_list =[]
    for i in range(5): # Replace to generate the logic you wish to dynamically create tasks
        op = DummyOperator(task_id=f"t{str(i)}_task", dag=dag)
        if i>0:
            operator_list[i - 1] >> op
        operator_list.append(op)
    return operator_list

with DAG(
    dag_id='loop',
    default_args=default_args,
    schedule_interval=None,
) as dag:
    start_op = DummyOperator(task_id='start_task')
    end_op = DummyOperator(task_id='end_task')
    tasks = generate_tasks()
    start_op >> tasks[0]
    tasks[-1] >> end_op
Run Code Online (Sandbox Code Playgroud)

在此输入图像描述

您可以将 DummyOperator 替换为您想要的任何运算符。