从一个 Airflow DAG 返回值到另一个

mrz*_*iak 5 python airflow apache-airflow-xcom

我的 DAG(我们称之为 DAG_A)使用trigger_dagrun操作符启动另一个 DAG (DAG_B) 。DAG_B 的任务使用 XCOM,我想在完成后从 DAG_B 运行的任务之一(正是我开始的任务)中获取 XCOM 值。

使用 XCOM 不是硬性要求 - 基本上 Airflow 本身提供的任何(合理的)机制都可以工作。如果需要,我可以更改 DAG_B。

找不到此类案例的任何示例,因此感谢您的帮助。

计划 B 是让 DAG_B 将 XCOM 值与一些运行 ID 一起保存到一些持久性存储(如 DB 或文件)中,DAG_A 将从那里获取它。但如果有一些内置机制可用,我想避免这种复杂化。

Mar*_*ers 4

可以通过传入dag_idto来从另一个 dag 中提取 XCOM 值xcom_pull()(请参阅task_instance.xcom_pull()函数文档)。只要您使用与当前 DAG 相同的执行日期触发 subdag,此操作就有效。这是通过模板化execution_date值轻松实现的:

trigger = TriggerDagRunOperator(
    task_id="trigger_dag_b",
    trigger_dag_id="DAG_B",
    execution_date="{{ execution_date }}",
    ...
)
Run Code Online (Sandbox Code Playgroud)

然后,如果您使用ExternalTaskSensor传感器等待特定任务完成或wait_for_completion=True在您的任务中使用,您稍后可以使用(添加任务 ID 和/或您想要拉取的 XCOM 密钥)来TriggerDagRunOperator()拉取 XCOM 。task_instance.xcom_pull(dag_id="DAG_B", ...)

如果您不反对编写 Python 运算符,也可以导入模型XCom并直接使用其XCom.get_one()方法

value = XCom.get_one(
    execution_date=ti.execution_date,
    key="target key",
    task_id="some.task.id",
    dag_id="DAG_B",
)
Run Code Online (Sandbox Code Playgroud)

我使用了类似的技术,使用多 dagrun 触发器(处理可变数量的资源);这比较棘手,因为在这种情况下你不能重复使用执行日期(每个 dagrun 必须有一个唯一的 (dag_id,execution_date) 元组)。

在这些情况下,我要么使用直接查询(通过触发器使用存储在 XCom 中的 dagrun id 将SQLAlchemyXCom模型加入到模型中,而不是依赖于执行日期匹配),要么通过预先配置 subdags 来避免整个问题。后者是通过设置子 dag 来实现的,配置告诉它在哪里输出父 dag 然后拾取的结果。文档似乎没有正确提及这一点,但参数也支持模板化,因此您可以在那里生成一个字典作为子 dag 的输入,子 dag 中的任务然后通过.DagRunconfTriggerDagRun()params