气流:每天重新执行过去N天的DAG作业

luc*_*one 4 airflow airflow-scheduler

我已安排执行DAG每天运行。它完美地工作了一天。

但是,每天我不仅要对当天{{ds}}重新执行,还要对前n天重新执行(比如说n = 7)。

例如,在计划于“ 2018-01-30”运行的下一次执行中,我希望Airflow不仅以执行日期“ 2018-01-30”运行DAG,而且希望为所有用户重新运行DAG从“ 2018-01-23”到“ 2018-01-30”的前几天。

有没有一种简单的方法可以使之前的执行“无效”,从而自动运行回填?

小智 5

您可以循环动态地生成任务,并将偏移量传递给操作员。

这是Python的一个示例。

import airflow
from airflow.operators.python_operator import PythonOperator
from airflow.models import DAG

from datetime import timedelta


args = {
    'owner': 'airflow',
    'start_date': airflow.utils.dates.days_ago(2),
    'schedule_interval': '0 10 * * *'
}

def check_trigger(execution_date, day_offset, **kwargs):
    target_date = execution_date - timedelta(days=day_offset)
    # use target_date

for day_offset in xrange(1, 8):
    PythonOperator(
        task_id='task_offset_' + i,
        python_callable=check_trigger,
        provide_context=True,
        dag=dag,
        op_kwargs={'day_offset' : day_offset}
    )
Run Code Online (Sandbox Code Playgroud)