如何将 DAG 运行日期传递给任务?

jim*_*arp 5 directed-acyclic-graphs airflow

我是气流新手,并试图弄清楚如何将 DAG 运行日期传递给每个任务,我的 DAG 中有以下内容:

tzinfo=tz.gettz('America/Los_Angeles')
dag_run_date = datetime.now(_tzinfo)

dag = DAG(
    'myDag', 
    default_args=default_args,
    schedule_interval = None,
    params = {
        "runDateTimeTz" : dag_run_date.strftime("%Y-%m-%dT%H:%M:%S.%f%z")
    }
)
Run Code Online (Sandbox Code Playgroud)

然后我尝试将 runDateTimeTz 参数传递给我的每个任务,如下所示。

task1 = GKEPodOperator(
    image='gcr.io/myJar:1.0.1.45',           
    cmds=['java'],
    arguments=["-jar","myJar.jar", {{params.runDateTimeTz}}"],
    dag=dag)

task2 = GKEPodOperator(
    image='gcr.io/myJar2:1.0.1.45',           
    cmds=['java'],
    arguments=["-jar","myJar2.jar", {{params.runDateTimeTz}}"],
    dag=dag)
Run Code Online (Sandbox Code Playgroud)

我的任务已正确执行,但我希望所有任务都能在 params.runDateTimeTz 中收到相同的运行日期,但它没有发生,例如任务 1 获取params.runDateTimeTz=2020-04-16T07:42:47.412716-07:00和任务 2 获取params.runDateTimeTz= 2020-04-16T07:43:29.913289-07:00

我想这种行为与气流填充 DAG 的方式有关params,看起来 params.runDateTimeTz 仅在任务开始运行时才获取,但我想在之前获取它并将其作为参数发送到每个任务,期望所有任务获得相同的值。

有人可以帮助我解决我做错的事情吗?

kax*_*xil 5

您可以使用Airflow 宏中的execution_date或:ds

详细信息:https://airflow.apache.org/docs/stable/macros-ref#default-variables

task1 = GKEPodOperator(
    image='gcr.io/myJar:1.0.1.45',           
    cmds=['java'],
    arguments=["-jar","myJar.jar", "{{ ds }}"],
    dag=dag)

task2 = GKEPodOperator(
    image='gcr.io/myJar2:1.0.1.45',           
    cmds=['java'],
    arguments=["-jar","myJar2.jar", "{{ ds }}"],
    dag=dag)
Run Code Online (Sandbox Code Playgroud)

如果您需要时间戳,可以使用{{ ts }}