jim*_*arp 5 directed-acyclic-graphs airflow
我是气流新手,并试图弄清楚如何将 DAG 运行日期传递给每个任务,我的 DAG 中有以下内容:
tzinfo=tz.gettz('America/Los_Angeles')
dag_run_date = datetime.now(_tzinfo)
dag = DAG(
'myDag',
default_args=default_args,
schedule_interval = None,
params = {
"runDateTimeTz" : dag_run_date.strftime("%Y-%m-%dT%H:%M:%S.%f%z")
}
)
Run Code Online (Sandbox Code Playgroud)
然后我尝试将 runDateTimeTz 参数传递给我的每个任务,如下所示。
task1 = GKEPodOperator(
image='gcr.io/myJar:1.0.1.45',
cmds=['java'],
arguments=["-jar","myJar.jar", {{params.runDateTimeTz}}"],
dag=dag)
task2 = GKEPodOperator(
image='gcr.io/myJar2:1.0.1.45',
cmds=['java'],
arguments=["-jar","myJar2.jar", {{params.runDateTimeTz}}"],
dag=dag)
Run Code Online (Sandbox Code Playgroud)
我的任务已正确执行,但我希望所有任务都能在 params.runDateTimeTz 中收到相同的运行日期,但它没有发生,例如任务 1 获取params.runDateTimeTz=2020-04-16T07:42:47.412716-07:00和任务 2 获取params.runDateTimeTz= 2020-04-16T07:43:29.913289-07:00
我想这种行为与气流填充 DAG 的方式有关params,看起来 params.runDateTimeTz 仅在任务开始运行时才获取,但我想在之前获取它并将其作为参数发送到每个任务,期望所有任务获得相同的值。
有人可以帮助我解决我做错的事情吗?
您可以使用Airflow 宏中的execution_date或:ds
详细信息:https://airflow.apache.org/docs/stable/macros-ref#default-variables
task1 = GKEPodOperator(
image='gcr.io/myJar:1.0.1.45',
cmds=['java'],
arguments=["-jar","myJar.jar", "{{ ds }}"],
dag=dag)
task2 = GKEPodOperator(
image='gcr.io/myJar2:1.0.1.45',
cmds=['java'],
arguments=["-jar","myJar2.jar", "{{ ds }}"],
dag=dag)
Run Code Online (Sandbox Code Playgroud)
如果您需要时间戳,可以使用{{ ts }}
| 归档时间: |
|
| 查看次数: |
6276 次 |
| 最近记录: |