我能否以编程方式确定 Airflow DAG 是计划的还是手动触发的?

Blo*_*je5 7 airflow

我想创建一个片段,根据 DAG 是计划的还是手动触发的,传递正确的日期。DAG 每月运行一次。DAG 根据上个月的数据生成报告(SQL 查询)。

如果我运行 DAG 计划,我可以使用以下 jinja 片段获取上个月:

execution_date.month
Run Code Online (Sandbox Code Playgroud)

鉴于 DAG 安排在上一期(上个月)结束时, execution_date 将正确返回上个月。但是在手动运行时,这将返回当前月份(执行日期将是手动触发的日期)。

我想写一个简单的宏来处理这种情况。但是,我找不到以编程方式查询 DAG 是否以编程方式触发的好方法。我能想出的最好的是获取run_id从数据库(通过创建具有数据库会话宏),检查wheter的run_id包含单词manual。有没有更好的方法来解决这个问题?

Raf*_*aad 7

目前没有直接的 DAG 属性来识别手动运行。要获得此信息,您需要检查run_id您提到的。

但是,有一个专用的宏获取run_id. 您不必自己从数据库中获取它。这是一个关于如何使用它的示例:

    def some_task_py(**context):
        run_id = context['templates_dict']['run_id']
        is_manual = run_id.startswith('manual__')
        is_scheduled = run_id.startswith('scheduled__')


    some_task = PythonOperator(
                task_id = 'some_task',
                dag=dag,
                templates_dict = {'run_id': '{{ run_id }}'},
                python_callable = some_task_py,
                provide_context = True)
Run Code Online (Sandbox Code Playgroud)


Don*_*lon 6

tl;dr:你可以用DagRun.external_trigger.


我注意到在树视图中,有一个围绕计划运行的大纲,但不是手动运行。那是因为后者已经stroke-opacity: 0;应用于CSS。

为此在 repo 中搜索,我发现了Airflow 开发人员如何检测手动运行(5 年前的线路,因此也应该在旧版本中工作):

.style("stroke-opacity", function(d) {return d.external_trigger ? "0": "1"})
Run Code Online (Sandbox Code Playgroud)

搜索external_trigger将我们带到了DagRun定义

因此,例如,如果您使用的是 Python 回调,则可以使用以下内容(可以在 DAG 或单独的文件中定义):

def my_fun(context):
    if context.get('dag_run').external_trigger:
        print('manual run')
    else:
        print('scheduled run')
Run Code Online (Sandbox Code Playgroud)

并在您的Operator设置中设置如下参数:

t1 = BashOperator(
    task_id='print_date',
    bash_command='date',
    on_failure_callback=my_fun,
    dag=dag,
)
Run Code Online (Sandbox Code Playgroud)

我已经测试过类似的东西并且它有效。

我认为你也可以做类似 if if {{ dag_run.external_trigger }}:- 但我没有测试过这个,我相信它只能在那个 DAG 的文件中工作。