我在气流 DAG 中运行 5 个 PythonOperator 任务,其中之一正在执行 ETL 作业,该作业需要很长时间,因此我的所有资源都被阻塞。有没有办法可以设置每个任务的最大执行时间,之后任务要么失败,要么通过消息标记为成功(以便 DAG 不会失败)?
我今天尝试创建我的第一个气流 DAG:
from datetime import timedelta
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago
default_args = {
'owner': 'default_user',
'start_date': days_ago(2),
'depends_on_past': True,
# With this set to true, the pipeline won't run if the previous day failed
'email': ['demo@email.de'],
'email_on_failure': True,
# upon failure this pipeline will send an email to your email set above
'email_on_retry': False,
'retries': 5,
'retry_delay': timedelta(minutes=30),
}
dag = DAG(
'basic_dag_2',
default_args=default_args, …Run Code Online (Sandbox Code Playgroud) 我在一个 dag 中有 30 个单独的任务,它们之间没有依赖关系。这些任务运行相同的代码。唯一的区别是数据量,有些任务会在几秒钟内完成,有些任务需要 2 小时或更长时间。
问题是在追赶期间,在几秒钟内完成的任务会被需要数小时才能完成才能进入下一个执行日期的任务所阻止。
我可以将它们分成单独的 dag,但这似乎很愚蠢,而且 30 个任务将来会增加到更多。
有没有办法在不同的执行时间在同一个 dag 中运行任务?就像任务一完成,就开始下一个执行日期,而不管其他任务的执行情况如何。
添加图片进行说明。基本上,我希望在第一排看到另外两个实心绿色方框,而第三排仍然落后。
编辑:
经过y2k-shubham的解释后,我尝试实现它。但它仍然不起作用。快速任务于 开始2019-01-30 00,在一秒内完成,并且不会启动2019-01-30 01,因为慢速任务仍在运行。如果可能的话,最好并行运行2019-01-30 01, 2019-01-30 02, 2019-01-30 03...
添加代码示例
import time
from datetime import datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.utils.trigger_rule import TriggerRule
default_args = {
'owner': 'test',
'depends_on_past': False,
'start_date': datetime(2019, 1, 30, 0, 0, 0),
'trigger_rule': TriggerRule.DUMMY
}
dag = DAG(dag_id='test_dag', default_args=default_args, schedule_interval='@hourly')
def fast(**kwargs):
return …Run Code Online (Sandbox Code Playgroud)