如何正确工作气流schedule_interval

k16*_*k16 8 airflow

我想尝试使用Airflow而不是Cron.但是schedule_interval不能像我预期的那样工作.

我写了如下的python代码.
根据我的理解,Airflow应该在"2016/03/30 8:15:00"运行但当时没有用.

如果我改变它像''schedule_interval':timedelta(分钟= 5)",我认为它正常工作.

"notice_slack.sh"只是为我的频道调用slack api.

# -*- coding: utf-8 -*-
from __future__ import absolute_import, unicode_literals
import os
from airflow.operators import BashOperator
from airflow.models import DAG
from datetime import datetime, timedelta

args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2016, 3, 29, 8, 15),
}

dag = DAG(
    dag_id='notice_slack',
    default_args=args,
    schedule_interval="@daily",
    dagrun_timeout=timedelta(minutes=1))

# cmd file name
CMD = '/tmp/notice_slack.sh'

run_this = BashOperator(
    task_id='run_transport', bash_command=CMD, dag=dag)
Run Code Online (Sandbox Code Playgroud)

我想在每天的特定时间运行我的一些脚本,就像这个cron设置一样.

15 08 * * * bash /tmp/notice_slack.sh
Run Code Online (Sandbox Code Playgroud)

我已阅读文档Scheduling&Triggers,我知道它有点不同的cron.
所以我尝试安排"start_date"和"schedule_interval"设置.

有谁知道我该怎么办?

气流版

信息 - 使用执行程序LocalExecutor

V1.7.0

亚马逊的Linux AMI/2015.09释放小笔记

p.m*_*aes 11

当2016/03/30 8:15:00 +计划间隔(每日)通过时,气流将启动您的DAG.所以你的DAG将在2016/03/31 8:15:00运行.

您可以查看Airflow FAQ

  • 这个答案不正确。`start_date` 参数只是 DAG 运行开始后的日期时间。但真正的时间表包含参数“schedule_interval”。`@daily` 值表示 DAG 必须在午夜运行。每天 08:15 运行:`schedule_interval='15 08 * * *'`。 (3认同)

小智 10

试试这个:

# -*- coding: utf-8 -*-
from __future__ import absolute_import, unicode_literals
import os
from airflow.operators import BashOperator
from airflow.models import DAG
from datetime import datetime, timedelta

args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2016, 3, 29),
}

dag = DAG(
    dag_id='notice_slack',
    default_args=args,
    schedule_interval="15 08 * * *",
    dagrun_timeout=timedelta(minutes=1))

# cmd file name
CMD = 'bash /tmp/notice_slack.sh'

run_this = BashOperator(
    task_id='run_transport', bash_command=CMD, dag=dag)
Run Code Online (Sandbox Code Playgroud)

start_date(datetime) - 任务的start_date确定第一个任务实例的execution_date.最佳做法是将start_date四舍五入到DAG的schedule_interval.

schedule_interval (datetime.timedelta或dateutil.relativedelta.relativedelta或作为cron表达式的str) - 定义DAG运行的频率,此timedelta对象被添加到最新任务实例的execution_date以计算下一个计划.

只需在您的cron设置中配置schedule_intervalbash_command相同即可.

  • 值得注意的是,`execution_date` 将是刚刚结束的间隔的开始。因此,使用此设置,第一次运行将在预定的 `dag_run` 中的日期为 `2016 03 29T08:15:00.000`,这是传入 `execution_date` 的内容,但它会在 `execution_date` 之后触发此运行2016 03 30T08:15:00`,即从 `execution_date` 开始的完整时间间隔已经过去。 (2认同)

小智 6

如果您不确定如何创建气流 cron 表达式,您可以尝试使用crontab.guru

  • 这至少需要一些解释才能成为有用的答案。 (3认同)

Sun*_*yAk 5

首先,你的开始日期应该在过去 - 而不是'start_date': datetime(2016, 3, 29, 8, 15) 你会尝试'start_date': datetime(2016, 2, 29, 8, 15)

并应用 'catchup':False 以防止回填 - 除非这是您想做的事情。

来自 Airflow 文档 - Airflow 调度程序在通过 start_date + schedule_interval 后不久触发任务。

调度间隔可以作为一个 cron 提供 - 如果你想每天早上 8 点 15 分运行它,表达式将是 - * '15 8 * * '

如果您只想在 10 月 31 日上午 8 点 15 分运行它,则表达式为 - * '15 8 31 10 '

要提供此信息,请'schedule_inteval':'15 8 * * *'在您的 Dag 属性中

您可以从https://crontab.guru/了解更多信息

或者,有气流预设 - 在此处输入图片说明

如果其中任何一个满足您的要求,那就很简单, 'schedule_interval':'@hourly'

最后,您还可以将时间表应用为 python timedelta 对象,例如 12 PM

'schedule_interval': timedelta(hours=12)