Kyl*_*ine 3 airflow airflow-scheduler
我正在尝试将DAG安排为每分钟运行一次,但它似乎每秒都在运行。根据我已阅读的所有内容,我只需要schedule_interval='*/1 * * * *', #..every 1 minute在DAG中添加它,就是这样,但是它不起作用。这是我设置的一个简单示例进行测试:
from airflow import DAG
from airflow.operators import SimpleHttpOperator, HttpSensor, EmailOperator, S3KeySensor
from datetime import datetime, timedelta
from airflow.operators.bash_operator import BashOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2018, 6, 4),
'schedule_interval': '*/1 * * * *', #..every 1 minute
'email': ['airflow@airflow.com'],
'email_on_failure': True,
'email_on_retry': False,
'retries': 2,
'retry_delay': timedelta(minutes=1)
}
dag = DAG(
dag_id='airflow_slack_example',
start_date=datetime(2018, 6, 4),
max_active_runs=3,
schedule_interval='*/1 * * * *', #..every 1 minute
default_args=default_args,
)
test= BashOperator(
task_id='test',
bash_command="echo hey >> /home/ec2-user/schedule_test.txt",
retries=1,
dag=dag)
Run Code Online (Sandbox Code Playgroud)
更新:
在与@Taylor Edmiston讨论了他的解决方案之后,我们意识到我需要添加的原因catchup=False是因为我使用Pip安装了Airflow,而Pip使用的是过时的Airflow版本。显然,如果您正在使用它的存储库主分支中的 Airflow,则无需包括catchup=False它即可像我尝试的那样每分钟运行一次。因此,尽管已接受的答案解决了我的问题,但仍无法解决@Taylor Edmiston发现的根本问题。
您schedule_interval对DAG是正确的:*/1 * * * *为每分钟。
您也可以从中删除start_date和schedule_interval,default_args因为它们多余于提供给DAG的kwarg。
如果您从首次创建此DAG时开始更改了计划,则可能是Airflow变得混乱了。尝试删除数据库中的DAG,然后重新启动调度程序和Web服务器。如果您位于Airflow的主分支上,则只需$ airflow delete_dag my_dag; 否则,链接的答案将说明如何在其他版本上执行此操作。
我将您的代码简化为要检查的内容,并且在Airflow的主分支中运行时,肯定每分钟运行一次DAG。
from datetime import datetime
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
}
dag = DAG(
dag_id='airflow_slack_example',
start_date=datetime(2018, 6, 4),
schedule_interval='*/1 * * * *',
default_args=default_args,
)
test = BashOperator(
task_id='test',
bash_command='echo "one minute test"',
dag=dag,
)
Run Code Online (Sandbox Code Playgroud)
DAG运行:
| 归档时间: |
|
| 查看次数: |
1842 次 |
| 最近记录: |