气流没有正确安排Python

The*_*nse 11 python airflow

码:

Python版本2.7.x和气流版本1.5.1

我的dag脚本就是这个

from airflow import DAG
from airflow.operators import BashOperator
from datetime import datetime, timedelta


default_args = {
'owner': 'xyz',
'depends_on_past': False,
'start_date': datetime(2015,10,13),
'email': ['xyz@email.in'],
'schedule_interval':timedelta(minutes=5),
'email_on_failure': True,
'email_on_retry': True,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG('testing', default_args=default_args)
run_this_first = BashOperator(task_id='Start1',bash_command='date', dag=dag)
for i in range(5):
    t = BashOperator(task_id="Orders1"+str(i), bash_command='sleep 5',dag=dag)
    t.set_upstream(run_this_first)
Run Code Online (Sandbox Code Playgroud)

从那你可以看出我正在创建一个包含6个任务的DAG,第一个任务(Start1)首先启动,之后所有其他五个任务开始

目前我在DAG开始之间给出了5分钟的时间延迟

它已经完美地运行了第一种类型的所有六个任务,但是在五分钟后DAG没有重新启动

已经超过1小时DAG没有重新启动我真的不知道我错了.

如果有人可以指出我出了什么问题,那将是非常好的.我尝试用airflow testing clear同样的事情清理然后发生.它运行第一个实例然后就站在那里.

命令行显示的唯一内容是 Getting all instance for DAG testing

当我更改schedule_interval的位置时,它只运行任何调度间隔parallel.That在5分钟内完成300或更多任务实例.没有5分钟的计划间隔

代码2:

from airflow import DAG
from airflow.operators import BashOperator
from datetime import datetime, timedelta


default_args = {
'owner': 'xyz',
'depends_on_past': False,
'start_date': datetime(2015,10,13),
'email': ['xyz@email.in'],
'email_on_failure': True,
'email_on_retry': True,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG('testing',schedule_interval=timedelta(minutes=5),default_args=default_args)#Schedule here
run_this_first = BashOperator(task_id='Start1',bash_command='date', dag=dag)
for i in range(5):
    t = BashOperator(task_id="Orders1"+str(i), bash_command='sleep 5',dag=dag)
    t.set_upstream(run_this_first)
Run Code Online (Sandbox Code Playgroud)

Yon*_*yiw 8

对于Code 2,我猜它每分钟运行的原因是:

  1. 开始时间是2015-10-13 00:00

  2. 计划间隔为5分钟

  3. 调度程序的每次心跳(默认为5秒),都会检查您的DAG

    • 首先检查:开始日期(没有找到上次执行日期)+调度程序间隔<当前时间?如果是,则执行DAG并记录上次执行时间.(例如2015-10-13 00:00 + 5min <当前?)
    • 第二次检查下一次心跳:上次执行时间+调度程序间隔<当前时间?如果是这样,DAG将再次执行.
    • ....

解决方案是将DAG start_date设置为datetime.now() - schedule_interval.

如果你想调试:

  1. 将LOGGINGLEVEL debug设置为settings.py

  2. 修改类方法is_queueable()airflow.models.TaskInstance

:

def is_queueable(self, flag_upstream_failed=False):
    logging.debug('Checking whether task instance is queueable or not!')
    if self.execution_date > datetime.now() - self.task.schedule_interval:
        logging.debug('Too early to execute: execution_date {0} + task.schedule_interval {1} > datetime.now() {2}'.format(self.execution_date, self.task.schedule_interval, datetime.now()))
        return False
        ...
Run Code Online (Sandbox Code Playgroud)