气流DAG每隔一分钟运行一次

Kyl*_*ine 3 airflow airflow-scheduler

我正在尝试将DAG安排为每分钟运行一次,但它似乎每秒都在运行。根据我已阅读的所有内容,我只需要schedule_interval='*/1 * * * *', #..every 1 minute在DAG中添加它,就是这样,但是它不起作用。这是我设置的一个简单示例进行测试:

from airflow import DAG
from airflow.operators import SimpleHttpOperator, HttpSensor, EmailOperator, S3KeySensor
from datetime import datetime, timedelta
from airflow.operators.bash_operator import BashOperator

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2018, 6, 4),
    'schedule_interval': '*/1 * * * *', #..every 1 minute
    'email': ['airflow@airflow.com'],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 2,
    'retry_delay': timedelta(minutes=1)
}

dag = DAG(
    dag_id='airflow_slack_example',
    start_date=datetime(2018, 6, 4),
    max_active_runs=3,
    schedule_interval='*/1 * * * *', #..every 1 minute
    default_args=default_args,
)

test= BashOperator(
    task_id='test',
    bash_command="echo hey >> /home/ec2-user/schedule_test.txt",
    retries=1,
    dag=dag)
Run Code Online (Sandbox Code Playgroud)

更新:

在与@Taylor Edmiston讨论了他的解决方案之后,我们意识到我需要添加的原因catchup=False是因为我使用Pip安装了Airflow,而Pip使用的是过时的Airflow版本。显然,如果您正在使用它的存储库主分支中的 Airflow,则无需包括catchup=False它即可像我尝试的那样每分钟运行一次。因此,尽管已接受的答案解决了我的问题,但仍无法解决@Taylor Edmiston发现的根本问题。

dar*_*ous 6

尝试添加catchup=FalseDAG()。由于start_date您已声明,DAG可能正在尝试回填。


Tay*_*ton 6

schedule_interval对DAG是正确的:*/1 * * * *每分钟

您也可以从中删除start_dateschedule_intervaldefault_args因为它们多余于提供给DAG的kwarg。

如果您从首次创建此DAG时开始更改了计划,则可能是Airflow变得混乱了。尝试删除数据库中的DAG,然后重新启动调度程序和Web服务器。如果您位于Airflow的主分支上,则只需$ airflow delete_dag my_dag; 否则,链接的答案将说明如何在其他版本上执行此操作。

我将您的代码简化为要检查的内容,并且在Airflow的主分支中运行时,肯定每分钟运行一次DAG。

from datetime import datetime

from airflow import DAG
from airflow.operators.bash_operator import BashOperator

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
}

dag = DAG(
    dag_id='airflow_slack_example',
    start_date=datetime(2018, 6, 4),
    schedule_interval='*/1 * * * *',
    default_args=default_args,
)

test = BashOperator(
    task_id='test',
    bash_command='echo "one minute test"',
    dag=dag,
)
Run Code Online (Sandbox Code Playgroud)

DAG运行:

在此处输入图片说明