我有一个airflow计划每 3 分钟运行一次的任务。
有时任务的持续时间超过 3 分钟,并且下一个计划开始(或排队),尽管它已经在运行。
有没有一种方法可以定义 dag,如果任务已经在运行,甚至不对其进行排队?
# airflow related
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators import MsSqlOperator
# other packages
from datetime import datetime
from datetime import timedelta
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2020, 7, 22, 15,00,00),
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(seconds=5)
}
dag = DAG(
dag_id='sales',
description='Run sales',
schedule_interval = '*/3 4,5,6,7,8,9,10,11,12,13,14,15,16,17 * * 0-5',
default_args=default_args,
catchup = False)
job1 = BashOperator(
task_id='sales',
bash_command='python2 /home/manager/ETL/sales.py',
dag = dag)
job2 = MsSqlOperator(
task_id='refresh_tabular',
mssql_conn_id='mssql_globrands',
sql="USE msdb ; EXEC dbo.sp_start_job N'refresh Management-sales' ; ",
dag = dag)
job1>>job2
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
939 次 |
| 最近记录: |