获取 Airflow TaskFlow 任务中的 dag_run 上下文

Aar*_*ger 1 airflow airflow-2.x airflow-taskflow

我的 dag 是从配置 JSON 开始的:

\n
{"foo" : "bar"}\n
Run Code Online (Sandbox Code Playgroud)\n

我有一个使用这个值的Python运算符:

\n
my_task = PythonOperator(\n    task_id="my_task",\n    op_kwargs={"foo": "{{ dag_run.conf[\'foo\'] }}"},\n    python_callable=lambda foo: print(foo))\n
Run Code Online (Sandbox Code Playgroud)\n

我\xe2\x80\x99d喜欢用TaskFlow任务\xe2\x80\xa6替换它

\n
my_task = PythonOperator(\n    task_id="my_task",\n    op_kwargs={"foo": "{{ dag_run.conf[\'foo\'] }}"},\n    python_callable=lambda foo: print(foo))\n
Run Code Online (Sandbox Code Playgroud)\n

如何从此处获取对 context、dag_run 的引用或以其他方式获取配置 JSON?

\n

Bas*_*lak 11

使用 TaskFlow API 有多种方法可以实现此目的:

import datetime

from airflow.decorators import dag, task
from airflow.operators.python import get_current_context


@dag(start_date=datetime.datetime(2023, 1, 1), schedule=None)
def so_75303816():
    @task
    def example_1(**context):
        foo = context["dag_run"].conf["foo"]
        print(foo)

    @task
    def example_2(dag_run=None):
        foo = dag_run.conf["foo"]
        print(foo)

    @task
    def example_3():
        context = get_current_context()
        foo = context["dag_run"].conf["foo"]
        print(foo)

    @task
    def example_4(params=None):
        foo = params["foo"]
        print(foo)

    example_1()
    example_2()
    example_3()
    example_4()


so_75303816()
Run Code Online (Sandbox Code Playgroud)

根据您的需求/偏好,您可以使用以下示例之一:

  • example_1:您获取所有任务实例上下文变量,并且必须提取“foo”。
  • example_2:您通过参数明确声明您只需要dag_run来自任务实例上下文变量。请注意,您必须将默认参数设置为None.
  • example_3:您还可以使用从任务内部获取任务实例上下文变量airflow.operators.python.get_current_context()
  • example_4:DAG 运行上下文也可通过名为“params”的变量获得。

有关更多信息,请参阅https://airflow.apache.org/docs/apache-airflow/stable/tutorial/taskflow.html#accessing-context-variables-in-decorated-taskshttps://airflow.apache.org /docs/apache-airflow/stable/templates-ref.html#variables