在 TriggerDagRunOperator 中提供上下文

efb*_*own 3 python airflow

我有一个由另一个 dag 触发的 dag。我已经通过DagRunOrder().payload字典以与官方示例相同的方式将一些配置变量传递给了这个 dag 。

现在在这个 dag 中,我有另一个 dagTriggerDagRunOperator来启动第二个 dag,并希望通过这些相同的配置变量。

我已经成功地访问了有效载荷变量,PythonOperator如下所示:

def run_this_func(ds, **kwargs):
    print("Remotely received value of {} for message and {} for day".format(
        kwargs["dag_run"].conf["message"], kwargs["dag_run"].conf["day"])
    )

run_this = PythonOperator(
    task_id='run_this',
    provide_context=True,
    python_callable=run_this_func,
    dag=dag
)
Run Code Online (Sandbox Code Playgroud)

但是相同的模式在以下情况下不起作用TriggerDagRunOperator

def trigger(context, dag_run_obj, **kwargs):
    dag_run_obj.payload = {
        "message": kwargs["dag_run"].conf["message"],
        "day": kwargs["dag_run"].conf["day"]
    }
    return dag_run_obj

trigger_step = TriggerDagRunOperator(
    task_id="trigger_modelling",
    trigger_dag_id="Dummy_Modelling",
    provide_context=True,
    python_callable=trigger,
    dag=dag
)
Run Code Online (Sandbox Code Playgroud)

它会产生关于使用的警告provide_context

INFO - Subtask: /usr/local/lib/python2.7/dist-packages/airflow/models.py:1927: PendingDeprecationWarning: Invalid arguments were passed to TriggerDagRunOperator. Support for passing such arguments will be dropped in Airflow 2.0. Invalid arguments were:
INFO - Subtask: *args: ()
INFO - Subtask: **kwargs: {'provide_context': True}
INFO - Subtask:   category=PendingDeprecationWarning
Run Code Online (Sandbox Code Playgroud)

这个错误表明我没有通过 conf :

INFO - Subtask: Traceback (most recent call last):
INFO - Subtask:   File "/usr/local/lib/python2.7/dist-packages/airflow/models.py", line 1374, in run
INFO - Subtask:     result = task_copy.execute(context=context)
INFO - Subtask:   File "/usr/local/lib/python2.7/dist-packages/airflow/operators/dagrun_operator.py", line 64, in execute
INFO - Subtask:     dro = self.python_callable(context, dro)
INFO - Subtask:   File "/home/user/airflow/dags/dummy_responses.py", line 28, in trigger
INFO - Subtask:     "message": kwargs["dag_run"].conf["message"],
INFO - Subtask: KeyError: 'dag_run'
Run Code Online (Sandbox Code Playgroud)

我尝试过的第二种模式也没有奏效,是使用如下params参数:

def trigger(context, dag_run_obj):
    dag_run_obj.payload = {
        "message": context['params']['message'],
        "day": context['params']['day']
    }
    return dag_run_obj

trigger_step = TriggerDagRunOperator(
    task_id="trigger_modelling",
    trigger_dag_id="Dummy_Modelling",
    python_callable=trigger,
    params={
        "message": "{{ dag_run.conf['message'] }}",
        "day": "{{ dag_run.conf['day'] }}"
    },
    dag=dag
)
Run Code Online (Sandbox Code Playgroud)

此模式不会产生错误,而是将参数作为字符串传递到下一个 dag,即它不计算表达式。


如何访问TriggerDagRunOperator第二个 dag 中的配置变量?

taa*_*ari 14

在 中Airflow2.0.x,相当于 的@efbbrown答案是:

from airflow.operators.trigger_dagrun import TriggerDagRunOperator

trigger_step = TriggerDagRunOperator(
    task_id="trigger_modelling",
    trigger_dag_id="Dummy_Modelling",
    conf={"message": "{{ dag_run.conf['message'] }}", "day":"{{ 
    dag_run.conf['day'] }}"},
    dag=dag
)
Run Code Online (Sandbox Code Playgroud)

GitHub上的此处描述了拉取请求。

请参阅外部触发器trigger_dagrun的文档。

这是有关该主题的YouTube 视频,其中显示了正确的导入。


efb*_*own 7

解决了:

dag_run对象被存储在上下文等的配置变量可以在被访问python_callableTriggerDagRunOperator与该图案:

def trigger(context, dag_run_obj):
    dag_run_obj.payload = {
        "message": context["dag_run"].conf["message"],
        "day": context["dag_run"].conf["day"]
    }
    return dag_run_obj

trigger_step = TriggerDagRunOperator(
    task_id="trigger_modelling",
    trigger_dag_id="Dummy_Modelling",
    python_callable=trigger,
    dag=dag
)
Run Code Online (Sandbox Code Playgroud)

  • 我可以使用 `"{{ dag_run.conf['message'] }}"` 和 `"{{ dag_run.conf['day'] }}"` 访问触发 dag 中的参数。这依赖于您尝试读取模板字段参数的运算符中的字段。如果 `"{{ dag_run.conf['day'] }}"` 模式对你不起作用,因为这些字段不是 template_fields,你将能够扩展你用来制作这些字段的操作符类模板字段。如果这没有意义,请告诉我,我会将其包含在我的答案中。 (2认同)
  • @efbbrown 这个解决方案在 Airflow v2.0.1 中不起作用;我收到此错误:“无效的参数已传递给 TriggerDagRunOperator”。您知道我们如何在 Airflow 版本 2 中的“TriggerDagRunOperator”中传递上下文吗? (2认同)