我有一个简单的 python 运算符,定义如下:
loop_records = PythonOperator(
task_id = 'loop_records',
provide_context = True,
python_callable = loop_topic_records,
dag = dag
)
Run Code Online (Sandbox Code Playgroud)
这个 python 操作符调用loop_topic_records,定义如下:
def loop_topic_records(**context):
parent_dag = context['dag']
for i in range(3):
op = DummyOperator(
task_id="child_" + str(i),
dag=parent_dag
)
logging.info('Child operator ' + str(i))
loop_records >> op
Run Code Online (Sandbox Code Playgroud)
我看到代码没有引发任何错误。它甚至打印Child operator 0..2在日志中。但是,在 dag 中Graph view我没有看到子操作符,我只看到loop_records节点,就好像我的 dag 只包含一个操作符一样。那么,这有什么问题呢?我该如何解决?
你不能为所欲为。每个 DAG 一旦被 Airflow 加载,就是静态的,并且不能从正在运行的任务中改变。您从任务内部对 DAG 所做的任何更改都将被忽略。
您可以做的是,使用插件提供的 Multi DAG 运行运算符启动其他 DAG;创建 DAG 的 DAG,可以这么说:airflow_multi_dagrun
from airflow.operators.dagrun_operator import DagRunOrder
from airflow.operators.multi_dagrun import TriggerMultiDagRunOperator
def gen_topic_records(**context):
for i in range(3):
# generate `DagRunOrder` objects to pass a payload (configuration)
# to the new DAG runs.
yield DagRunOrder(payload={"child_id": i})
logging.info('Triggering topic_record_dag #%d', i)
loop_topic_record_dags = TriggerMultiDagRunOperator(
task_id='loop_topic_record_dags',
dag=dag,
trigger_dag_id='topic_record_dag',
python_callable=gen_topic_records,
)
Run Code Online (Sandbox Code Playgroud)
以上将触发一个名为的 DAGtopic_record_dag启动,3 次。在该 DAG 中的运算符内部,您可以通过dag_run.conf对象(在模板中)或context['dag_run'].conf引用(在PythonOperator()代码中,使用provide_context=Trueset)访问任何设置为有效负载的内容。
如果在完成这 3 个 DAG 后您需要做其他工作,您只需要在上述 DAG 中添加一个传感器即可。传感器是等待特定外部信息可用的操作员。在这里使用一个在所有子 DAG 完成时触发的。同一个插件有一个MultiDagRunSensor正是你在这里需要的,它会在TriggerMultiDagRunOperator任务启动的所有 DAG完成(成功或失败)时触发:
from airflow import DAG
from airflow.operators.multi_dagrun import MultiDagRunSensor
wait_for_topic_record_dags = MultiDagRunSensor(
task_id='wait_for_topic_record_dags',
dag=dag
)
loop_topic_record_dags >> wait_for_topic_record_dags
Run Code Online (Sandbox Code Playgroud)
然后在该传感器之后放置更多操作员。
| 归档时间: |
|
| 查看次数: |
1361 次 |
| 最近记录: |