气流| 设置变量

CJL*_*L89 2 json airflow

有谁知道从 Airflow 中的 DAG 更新变量(但采用 JSON 格式)的语法?

现在我有这个:

Variable.set(f"update_{kwargs['table_id']}", *last_update)

这会将变量更新为:

  • updated_giftcard_id 0
  • updated_order_id 0

但这会产生不可扩展的新变量。

理想情况下,我想更新相同的变量并以 JSON 格式传递它们:

例如:"last_ids": {"updated_giftcard_id:" "0", "updated_order_id:" "1", etc}"

我尝试传递一些参数,例如Variable.set(key="updated_giftcard_id", value="0", serialize=True),但无法正常工作,因为它抱怨我传递了太多参数。

提前致谢!

Jos*_*ell 5

和方法确实有一个序列化/反序列化参数,Variable.set()和,分别用于本机处理 JSON 类型变量。Variable.get()serialize_jsondeserialize_json

你可以从这样的事情推断:

import logging
from airflow import DAG
from airflow.decorators import task
from airflow.models import Variable
from datetime import datetime


@task
def variable_set():
    Variable.set(key="updated_giftcard_id", value="0", serialize_json=True)
    Variable.set(
        key="last_ids", value={"updated_giftcard_id": "0", "updated_order_id": "1"}, serialize_json=True
    )

@task
def variable_get():
    logging.info(Variable.get(key="updated_giftcard_id", deserialize_json=True))
    logging.info(Variable.get(key="last_ids", deserialize_json=True))

with DAG(
    "example",
    start_date=datetime(2021, 8, 13),
    schedule_interval=None,
    catchup=False,
) as dag:
    variable_set() >> variable_get()
Run Code Online (Sandbox Code Playgroud)

“variable_set”任务的结果: 写入 JSON 变量 “variable_get”任务的结果: 读取 JSON 变量