我可以使用 TriggerDagRunOperator 将参数传递给触发的 dag 吗?空气流动

Isr*_*uez 6 python airflow

我正在使用 Airflow 1.10.11

我可以看到我可以使用 UI 和 CLI 将参数发送到 dag 执行(https://airflow.apache.org/docs/apache-airflow/1.10.11/dag-run.html),但是我可以这样做吗使用 TriggerDagRunOperator ?(https://airflow.apache.org/docs/apache-airflow/1.10.11/_api/airflow/operators/dagrun_operator/index.html)。

阅读https://github.com/apache/airflow/blob/master/airflow/example_dags/example_trigger_target_dag.py,https://github.com/apache/airflow/blob/master/airflow/example_dags/example_trigger_controller_dag.py似乎​就像我可以的那样,但是当我自己尝试这个脚本时,我无法检索conf消息(可能是因为他们似乎使用airflow 2.0,而我使用1.10)。

所以我的问题是:当 TriggerDagRunOperator 该目标时,有没有办法将参数发送到目标 dag?

提前致谢

Ala*_* Ma 7

下面是一个示例,演示如何设置由TriggerDagRunOperator 触发的 dagruns 发送的conf(1.10.11 中)

触发器.py

from datetime import datetime
from airflow.models import DAG
from airflow.operators.dagrun_operator import TriggerDagRunOperator

dag = DAG(
    dag_id='trigger',
    schedule_interval='@once',
    start_date=datetime(2021, 1, 1)
)


def modify_dro(context, dagrun_order):
    print(context)
    print(dagrun_order)
    dagrun_order.payload = {
        "message": "This is my conf message"
    }
    return dagrun_order


run_this = TriggerDagRunOperator(
    task_id='run_this',
    trigger_dag_id='my_dag',
    python_callable=modify_dro,
    dag=dag
)
Run Code Online (Sandbox Code Playgroud)

达格.py

from datetime import datetime
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator

dag = DAG(
    dag_id='my_dag',
    schedule_interval='@once',
    start_date=datetime(2021, 1, 1)
)


def run_this_func(**context):
    print(context["dag_run"].conf)


run_this = PythonOperator(
    task_id='run_this',
    provide_context=True,
    python_callable=run_this_func,
    dag=dag,
)
Run Code Online (Sandbox Code Playgroud)

这是DAGrun_this中任务的输出my_dag

[2021-03-05 16:39:15,649] {logging_mixin.py:112} INFO - {'message': 'This is my conf message'}
Run Code Online (Sandbox Code Playgroud)

TriggerDagRunOperator 接受一个 python_callable,允许您修改 DagRunOrder。DagRunOrder 是一个抽象构造,它保存运行 ID 和有效负载,当触发目标 DAG 时,这些运行 ID 和有效负载将作为 conf 发送

  • AirFlow 2.0 中删除了 python_callable TriggerDagRunOperator (3认同)

ims*_*eth 6

对于airflow2.0.x,您需要使用conf来传递参数。

python_callable 如 PR 中所述,不推荐使用

https://github.com/apache/airflow/pull/6317

欲了解更多详情,请参阅

/sf/answers/4707816711/