我在一个 dag 中有 30 个单独的任务,它们之间没有依赖关系。这些任务运行相同的代码。唯一的区别是数据量,有些任务会在几秒钟内完成,有些任务需要 2 小时或更长时间。
问题是在追赶期间,在几秒钟内完成的任务会被需要数小时才能完成才能进入下一个执行日期的任务所阻止。
我可以将它们分成单独的 dag,但这似乎很愚蠢,而且 30 个任务将来会增加到更多。
有没有办法在不同的执行时间在同一个 dag 中运行任务?就像任务一完成,就开始下一个执行日期,而不管其他任务的执行情况如何。
添加图片进行说明。基本上,我希望在第一排看到另外两个实心绿色方框,而第三排仍然落后。
编辑:
经过y2k-shubham的解释后,我尝试实现它。但它仍然不起作用。快速任务于 开始2019-01-30 00,在一秒内完成,并且不会启动2019-01-30 01,因为慢速任务仍在运行。如果可能的话,最好并行运行2019-01-30 01, 2019-01-30 02, 2019-01-30 03...
添加代码示例
import time
from datetime import datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.utils.trigger_rule import TriggerRule
default_args = {
'owner': 'test',
'depends_on_past': False,
'start_date': datetime(2019, 1, 30, 0, 0, 0),
'trigger_rule': TriggerRule.DUMMY
}
dag = DAG(dag_id='test_dag', default_args=default_args, schedule_interval='@hourly')
def fast(**kwargs):
return 1
def slow(**kwargs):
time.sleep(600)
return 1
fast_task = PythonOperator(
task_id='fast',
python_callable=fast,
provide_context=True,
priority_weight=10000,
pool='fast_pool',
# weight_rule='upstream', # using 1.9, this param doesn't exist
dag=dag
)
slow_task = PythonOperator(
task_id='slow',
python_callable=slow,
provide_context=True,
priority_weight=500,
pool='slow_pool',
# weight_rule='upstream', # using 1.9, this param doesn't exist
dag=dag
)
fast_task >> slow_task # not working
Run Code Online (Sandbox Code Playgroud)
事实证明,可以设置两个变量,这将很容易解决我的问题。
concurrency和max_active_runs
在下面的示例中,您可以运行 4 个 dag,每个 dag 可以同时运行 4 个任务。其他组合也是可能的。
dag = DAG(
dag_id='sample_dag',
default_args=default_args,
schedule_interval='@daily',
# this will allow up to 16 tasks to be run at the same time
concurrency=16,
# this will allow up to 4 dags to be run at the same time
max_active_runs=4,
)
Run Code Online (Sandbox Code Playgroud)