在气流中启动具有可变并行任务的 subdag

Phi*_*ero 5 python workflow task airflow

我有一个我想修改的气流工作流程(参见底部的插图)。
但是,我在文档中找不到这样做的方法。

我在没有运气的情况下看过 subdags、branching 和 xcoms。

似乎没有一种方法可以根据操作员的返回来指定应该在 subdag 中并行运行多少个任务。
更重要的是,subdag 中的每个任务都会收到一个不同的参数(前一个运算符返回的列表中的一个元素)

这是我正在尝试做的事情的说明: 气流描述

Dan*_*ang 3

我也遇到过这个问题,但还没有真正找到一个干净的方法来解决它。如果您知道要传递给每个 subdag 的所有不同的可能参数...那么您可以将其硬编码到 DAG 文件中,并且始终使用每个可能的 subdag 创建 DAG。然后你有一个运算符(类似于你的“获取每个n”),它获取你想要运行的子达格列表,并让它将不在列表中的任何下游子达格标记为skipped。像这样的东西:

SUBDAGS = {
    'a': {'id': 'foo'},
    'b': {'id': 'bar'},
    'c': {'id': 'test'},
    'd': {'id': 'hi'},
}   

def _select_subdags(**context):
    names = fetch_list()  # returns ["a", "c", "d"]
    tasks_to_skip = ['my_subdag_' + name for name in set(SUBDAGS) - set(names)]

    session = Session()
    tis = session.query(TaskInstance).filter(
        TaskInstance.dag_id == context['dag'].dag_id, 
        TaskInstance.execution_date == context['ti'].execution_date,
        TaskInstance.task_id.in_(tasks_to_skip),
    )
    for ti in tis:
        now = datetime.utcnow()
        ti.state = State.SKIPPED
        ti.start_date = now
        ti.end_date = now
        session.merge(ti)
    session.commit()
    session.close()

select_subdags = PythonOperator(
    task_id='select_subdags',
    dag=dag,
    provide_context=True,
    python_callable=_select_subdags,
)

for name, params in SUBDAGS.iteritems():
    child_dag_id = 'my_subdag_' + name
    subdag_op = SubDagOperator(
        task_id=child_dag_id,
        dag=dag,
        subdag=my_subdag(dag.dag_id, child_dag_id, params),
    )
    select_subdags >> subdag_op
Run Code Online (Sandbox Code Playgroud)

显然这并不理想,尤其是当您最终只想运行数百个子选项中的一个时。我们还遇到了单个 DAG 中数千个子DAG 的一些性能问题,因为它可能会导致大量任务实例,其中大多数都被简单地跳过。