时区感知 DAG 的气流执行日期错误

Mao*_*eli 5 timezone jinja2 airflow

我们在 Kubernetes (KubernetesExecutor) 上使用 Airflow v2.2.3,我们的环境需要 DAG 预客户,并且每个客户可以位于不同的时区。

每个 DAG 应该在午夜安排在自己的时区,我发现可以使用 Airflow 的时区感知 DAG来实现

因此,为每个 DAG 配置时区感知是有效的start_date,并使每个 DAG 在自己的时区午夜执行:

start_date_utc = (datetime.now() - timedelta(days=2)).replace(
        hour=0, minute=0, second=0, microsecond=0)

timezone = pendulum.timezone(get_customer_timezone(customer))
START_DATE = start_date_utc.replace(tzinfo=timezone)

default_args = {
        "owner": "owner",
        "depends_on_past": False,
        "start_date": START_DATE,
}

dag = DAG(
    dag_id,
    schedule_interval="0 0 * * *",
    default_args=default_args,
    tags=[cusotmer_name]
)

date = '{{ execution_date | ds }}'

operator_args = {
    "customer_date": date,
}
Run Code Online (Sandbox Code Playgroud)

我的问题是jinja 模板和 dag_run execution_date(dag_run.logic_date) 仍然是 UTC,并且没有根据 DAG 时区进行调整。

在不同时区运行 DAG 时,这会导致意外行为,但execution_date时区偏移早于 UTC 的 DAG 是错误的(2 天前,而不是 1 天)

我需要一些关于如何根据execution_dateDAG 时区更改的建议

谢谢

Ela*_*lad 7

的值execution_date采用 UTC 时间。要转换为不同的时区,您可以执行以下操作:

{{ execution_date.in_timezone('Europe/Amsterdam') }}
Run Code Online (Sandbox Code Playgroud)

如果您在 DAG 中设置时区,您可以从dag.timezoneJinja 访问并使用它。

{{ dag_run.execution_date.astimezone(dag.timezone) }}
Run Code Online (Sandbox Code Playgroud)

您已经注意到该execution_date宏已被弃用,因此您应该使用logical_date

{{ dag_run.logical_date.astimezone(dag.timezone) }}
Run Code Online (Sandbox Code Playgroud)