Airflow:如何从 PythonOperator 中的 python_callable 内部创建子运算符

Jac*_*ian 3 python airflow

我有一个简单的 python 运算符,定义如下:

loop_records = PythonOperator(
    task_id = 'loop_records',
    provide_context = True,
    python_callable = loop_topic_records,
    dag = dag    
)
Run Code Online (Sandbox Code Playgroud)

这个 python 操作符调用loop_topic_records,定义如下:

def loop_topic_records(**context):
    parent_dag = context['dag']
    for i in range(3):
        op = DummyOperator(
            task_id="child_" + str(i),
            dag=parent_dag
        )
        logging.info('Child operator ' + str(i))
        loop_records >> op
Run Code Online (Sandbox Code Playgroud)

我看到代码没有引发任何错误。它甚至打印Child operator 0..2在日志中。但是,在 dag 中Graph view我没有看到子操作符,我只看到loop_records节点,就好像我的 dag 只包含一个操作符一样。那么,这有什么问题呢?我该如何解决?

Mar*_*ers 5

你不能为所欲为。每个 DAG 一旦被 Airflow 加载,就是静态的,并且不能从正在运行的任务中改变。您从任务内部对 DAG 所做的任何更改都将被忽略

可以做的是,使用插件提供的 Multi DAG 运行运算符启动其他 DAG;创建 DAG 的 DAG,可以这么说:airflow_multi_dagrun

from airflow.operators.dagrun_operator import DagRunOrder
from airflow.operators.multi_dagrun import TriggerMultiDagRunOperator

def gen_topic_records(**context):
    for i in range(3):
        # generate `DagRunOrder` objects to pass a payload (configuration)
        # to the new DAG runs.
        yield DagRunOrder(payload={"child_id": i})
        logging.info('Triggering topic_record_dag #%d', i)

loop_topic_record_dags = TriggerMultiDagRunOperator(
    task_id='loop_topic_record_dags',
    dag=dag,
    trigger_dag_id='topic_record_dag',
    python_callable=gen_topic_records,
)
Run Code Online (Sandbox Code Playgroud)

以上将触发一个名为的 DAGtopic_record_dag启动,3 次。在该 DAG 中的运算符内部,您可以通过dag_run.conf对象(在模板中)或context['dag_run'].conf引用(在PythonOperator()代码中,使用provide_context=Trueset)访问任何设置为有效负载的内容。

如果在完成这 3 个 DAG 后您需要做其他工作,您只需要在上述 DAG 中添加一个传感器即可。传感器是等待特定外部信息可用的操作员。在这里使用一个在所有子 DAG 完成时触发的。同一个插件有一个MultiDagRunSensor正是你在这里需要的,它会在TriggerMultiDagRunOperator任务启动的所有 DAG完成(成功或失败)时触发:

from airflow import DAG
from airflow.operators.multi_dagrun import MultiDagRunSensor

wait_for_topic_record_dags = MultiDagRunSensor(
    task_id='wait_for_topic_record_dags',
    dag=dag
)

loop_topic_record_dags >> wait_for_topic_record_dags
Run Code Online (Sandbox Code Playgroud)

然后在该传感器之后放置更多操作员。