运行时添加到 DAG 的任务无法调度

Ged*_*nas 2 airflow apache-airflow airflow-scheduler

我的想法是有一个foo生成输入列表(用户、报告、日志文件等)的任务,并为输入列表中的每个元素启动一个任务。目标是利用 Airflow 的重试和其他逻辑,而不是重新实现它。

因此,理想情况下,我的 DAG 应如下所示: 在此处输入图片说明

这里唯一的变量是生成的任务数量。在完成所有这些任务后,我想再做一些任务,因此为每个任务启动一个新的 DAG 似乎并不合适。

这是我的代码:

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2015, 6, 1)
}

dag = DAG('dynamic_dag_generator', schedule_interval=None, default_args=default_args)

foo_operator = BashOperator(
    task_id='foo',
    bash_command="echo '%s'" % json.dumps(range(0, random.randint(40,60))),
    xcom_push=True,
    dag=dag)

def gen_nodes(**kwargs):
    ti = kwargs['ti']
    workers = json.loads(ti.xcom_pull(task_ids='foo'))

    for wid in workers:
        print("Iterating worker %s" % wid)
        op = PythonOperator(
            task_id='test_op_%s' % wid,
            python_callable=lambda: print("Dynamic task!"),
            dag=dag
        )

        op.set_downstream(bar_operator)
        op.set_upstream(dummy_op)

gen_subdag_node_op = PythonOperator(
    task_id='gen_subdag_nodes',
    python_callable=gen_nodes,
    provide_context=True,
    dag=dag
)

gen_subdag_node_op.set_upstream(foo_operator)

dummy_op = DummyOperator(
    task_id='dummy',
    dag=dag
)

dummy_op.set_upstream(gen_subdag_node_op)

bar_operator = DummyOperator(
    task_id='bar',
    dag=dag)

bar_operator.set_upstream(dummy_op)
Run Code Online (Sandbox Code Playgroud)

在日志中,我可以看到它gen_nodes被正确执行(即Iterating worker 5,等等)。但是,新任务没有被调度,也没有证据表明它们被执行了。

我在网上找到了相关的代码示例,例如this,但无法使其工作。我错过了什么吗?

或者,是否有更合适的方法来解决这个问题(隔离工作单元)?

jhn*_*lvr 5

目前,气流不支持在 dag 运行时添加/删除任务。

工作流顺序将是在 dag 运行开始时评估的任何内容。

请参阅此处的第二段。

这意味着您不能根据运行中发生的事情添加/删除任务。您可以根据与运行无关的内容在 for 循环中添加 X 任务,但在运行开始后,不会更改工作流形状/顺序。

很多时候,您可以BranchPythonOperator在 dag 运行期间使用 a来做出决定(这些决定可以基于您的xcom值),但它们必须是在工作流程中已经存在的分支下进行的决定。

达格运行,达格定义在气流中不是完全直观的方式分离,但更多或DAG运行(内部产生所创建少任何东西/ xcomdag_run.conf等)是不用于限定DAG本身可用。