luc*_*one 4 airflow airflow-scheduler
我已安排执行DAG每天运行。它完美地工作了一天。
但是,每天我不仅要对当天{{ds}}重新执行,还要对前n天重新执行(比如说n = 7)。
例如,在计划于“ 2018-01-30”运行的下一次执行中,我希望Airflow不仅以执行日期“ 2018-01-30”运行DAG,而且希望为所有用户重新运行DAG从“ 2018-01-23”到“ 2018-01-30”的前几天。
有没有一种简单的方法可以使之前的执行“无效”,从而自动运行回填?
小智 5
您可以循环动态地生成任务,并将偏移量传递给操作员。
这是Python的一个示例。
import airflow
from airflow.operators.python_operator import PythonOperator
from airflow.models import DAG
from datetime import timedelta
args = {
'owner': 'airflow',
'start_date': airflow.utils.dates.days_ago(2),
'schedule_interval': '0 10 * * *'
}
def check_trigger(execution_date, day_offset, **kwargs):
target_date = execution_date - timedelta(days=day_offset)
# use target_date
for day_offset in xrange(1, 8):
PythonOperator(
task_id='task_offset_' + i,
python_callable=check_trigger,
provide_context=True,
dag=dag,
op_kwargs={'day_offset' : day_offset}
)
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
1654 次 |
| 最近记录: |