根据上游任务的输出在气流中生成动态任务

Ami*_*mar 7 airflow

如何根据上游任务返回的列表动态生成任务。

我尝试了以下方法:

使用外部文件从列表中写入和读取 - 此选项有效,但我正在寻找更优雅的解决方案。

Xcom 拉进一个 subdag 工厂。这是行不通的。我能够将一个列表从上游任务传递给一个 subdag,但是 xcom 只能在 subdag 的任务内部访问,并且不能用于循环/迭代返回的列表并生成任务。例如 subdag 工厂方法。

 def subdag1(parent_dag_name, child_dag_name, default_args,**kwargs):
    dag_subdag = DAG(
        dag_id='%s.%s' % (parent_dag_name, child_dag_name),
        default_args=default_args,
        schedule_interval="@once",
    )
    list_files='{{ task_instance.xcom_pull( dag_id="qqq",task_ids="push")}}'
    for newtask in list_files:
        BashOperator(
            task_id='%s-task-%s' % (child_dag_name,   'a'),
            default_args=default_args,
            bash_command= 'echo '+ list_files + newtask,
            dag=dag_subdag,
        )
    return dag_subdag
Run Code Online (Sandbox Code Playgroud)