Aar*_*ger 1 airflow airflow-2.x airflow-taskflow
我的 dag 是从配置 JSON 开始的:
\n{"foo" : "bar"}\nRun Code Online (Sandbox Code Playgroud)\n我有一个使用这个值的Python运算符:
\nmy_task = PythonOperator(\n task_id="my_task",\n op_kwargs={"foo": "{{ dag_run.conf[\'foo\'] }}"},\n python_callable=lambda foo: print(foo))\nRun Code Online (Sandbox Code Playgroud)\n我\xe2\x80\x99d喜欢用TaskFlow任务\xe2\x80\xa6替换它
\nmy_task = PythonOperator(\n task_id="my_task",\n op_kwargs={"foo": "{{ dag_run.conf[\'foo\'] }}"},\n python_callable=lambda foo: print(foo))\nRun Code Online (Sandbox Code Playgroud)\n如何从此处获取对 context、dag_run 的引用或以其他方式获取配置 JSON?
\nBas*_*lak 11
使用 TaskFlow API 有多种方法可以实现此目的:
import datetime
from airflow.decorators import dag, task
from airflow.operators.python import get_current_context
@dag(start_date=datetime.datetime(2023, 1, 1), schedule=None)
def so_75303816():
@task
def example_1(**context):
foo = context["dag_run"].conf["foo"]
print(foo)
@task
def example_2(dag_run=None):
foo = dag_run.conf["foo"]
print(foo)
@task
def example_3():
context = get_current_context()
foo = context["dag_run"].conf["foo"]
print(foo)
@task
def example_4(params=None):
foo = params["foo"]
print(foo)
example_1()
example_2()
example_3()
example_4()
so_75303816()
Run Code Online (Sandbox Code Playgroud)
根据您的需求/偏好,您可以使用以下示例之一:
example_1:您获取所有任务实例上下文变量,并且必须提取“foo”。example_2:您通过参数明确声明您只需要dag_run来自任务实例上下文变量。请注意,您必须将默认参数设置为None.example_3:您还可以使用从任务内部获取任务实例上下文变量airflow.operators.python.get_current_context()。example_4:DAG 运行上下文也可通过名为“params”的变量获得。有关更多信息,请参阅https://airflow.apache.org/docs/apache-airflow/stable/tutorial/taskflow.html#accessing-context-variables-in-decorated-tasks和https://airflow.apache.org /docs/apache-airflow/stable/templates-ref.html#variables。
| 归档时间: |
|
| 查看次数: |
2684 次 |
| 最近记录: |