如何获得气流dag的JobID?

Che*_*n J 7 python pyspark airflow apache-airflow

当我们在Airflow用户界面上执行dagrun时,在"图表视图"中,我们会获得每个作业运行的详细信息.

JobID类似于"schedule__2017-04-11T10:47:00".

我需要这个JobID用于跟踪和日志创建,我在其中维护每个任务/ dagrun所花费的时间.

所以我的问题是我如何在正在运行的同一个dag中获得JobID.

谢谢,阿赫亚

jhn*_*lvr 9

实际调用此值run_id,可以通过上下文或宏访问.

在python运算符中,这是通过上下文访问的,在bash运算符中,可以通过bash_command字段上的jinja模板访问它.

有关宏中可用内容的更多信息:

https://airflow.incubator.apache.org/code.html#macros

关于jinja的更多信息:

https://airflow.incubator.apache.org/concepts.html#jinja-templating

from airflow.models import DAG
from datetime import datetime
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator


dag = DAG(
    dag_id='run_id',
    schedule_interval=None,
    start_date=datetime(2017, 2, 26)
)

def my_func(**kwargs):
    context = kwargs
    print(context['dag_run'].run_id)

t1 = PythonOperator(
    task_id='python_run_id',
    python_callable=my_func,
    provide_context=True,
    dag=dag
    )

t2 = BashOperator(
    task_id='bash_run_id',
    bash_command='echo {{run_id}}',
    dag=dag)

t1.set_downstream(t2)
Run Code Online (Sandbox Code Playgroud)

使用此dag作为示例,并检查每个操作员的日志,您应该看到run_id打印在日志中.

  • 这仅适用于预定运行吗?我从 CLI 运行并得到 `kwargs['dag_run']` 为 None (4认同)