我想尝试使用Airflow而不是Cron.但是schedule_interval不能像我预期的那样工作.
我写了如下的python代码.
根据我的理解,Airflow应该在"2016/03/30 8:15:00"运行但当时没有用.
如果我改变它像''schedule_interval':timedelta(分钟= 5)",我认为它正常工作.
"notice_slack.sh"只是为我的频道调用slack api.
# -*- coding: utf-8 -*-
from __future__ import absolute_import, unicode_literals
import os
from airflow.operators import BashOperator
from airflow.models import DAG
from datetime import datetime, timedelta
args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2016, 3, 29, 8, 15),
}
dag = DAG(
dag_id='notice_slack',
default_args=args,
schedule_interval="@daily",
dagrun_timeout=timedelta(minutes=1))
# cmd file name
CMD = '/tmp/notice_slack.sh'
run_this = BashOperator(
task_id='run_transport', bash_command=CMD, dag=dag)
Run Code Online (Sandbox Code Playgroud)
我想在每天的特定时间运行我的一些脚本,就像这个cron设置一样.
15 08 * * * bash /tmp/notice_slack.sh
Run Code Online (Sandbox Code Playgroud)
我已阅读文档Scheduling&Triggers,我知道它有点不同的cron.
所以我尝试安排"start_date"和"schedule_interval"设置.
有谁知道我该怎么办?
气流版
信息 - 使用执行程序LocalExecutor
V1.7.0
亚马逊的Linux AMI/2015.09释放小笔记
p.m*_*aes 11
当2016/03/30 8:15:00 +计划间隔(每日)通过时,气流将启动您的DAG.所以你的DAG将在2016/03/31 8:15:00运行.
您可以查看Airflow FAQ
小智 10
试试这个:
# -*- coding: utf-8 -*-
from __future__ import absolute_import, unicode_literals
import os
from airflow.operators import BashOperator
from airflow.models import DAG
from datetime import datetime, timedelta
args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2016, 3, 29),
}
dag = DAG(
dag_id='notice_slack',
default_args=args,
schedule_interval="15 08 * * *",
dagrun_timeout=timedelta(minutes=1))
# cmd file name
CMD = 'bash /tmp/notice_slack.sh'
run_this = BashOperator(
task_id='run_transport', bash_command=CMD, dag=dag)
Run Code Online (Sandbox Code Playgroud)
start_date
(datetime) - 任务的start_date确定第一个任务实例的execution_date.最佳做法是将start_date四舍五入到DAG的schedule_interval.
schedule_interval
(datetime.timedelta或dateutil.relativedelta.relativedelta或作为cron表达式的str) - 定义DAG运行的频率,此timedelta对象被添加到最新任务实例的execution_date以计算下一个计划.
只需在您的cron设置中配置schedule_interval
和bash_command
相同即可.
首先,你的开始日期应该在过去 - 而不是'start_date': datetime(2016, 3, 29, 8, 15)
你会尝试'start_date': datetime(2016, 2, 29, 8, 15)
并应用 'catchup':False 以防止回填 - 除非这是您想做的事情。
来自 Airflow 文档 - Airflow 调度程序在通过 start_date + schedule_interval 后不久触发任务。
调度间隔可以作为一个 cron 提供 - 如果你想每天早上 8 点 15 分运行它,表达式将是 - * '15 8 * * '
如果您只想在 10 月 31 日上午 8 点 15 分运行它,则表达式为 - * '15 8 31 10 '
要提供此信息,请'schedule_inteval':'15 8 * * *'
在您的 Dag 属性中
您可以从https://crontab.guru/了解更多信息
如果其中任何一个满足您的要求,那就很简单, 'schedule_interval':'@hourly'
最后,您还可以将时间表应用为 python timedelta 对象,例如 12 PM
'schedule_interval': timedelta(hours=12)
归档时间: |
|
查看次数: |
29752 次 |
最近记录: |