Art*_*ano 2 python parallel-processing airflow
我目前正在开发一个 DAG,需要每月循环处理一长串任务。
为了实现这一点,我创建一个空列表,然后循环多个任务,根据新的月份更改它们的 task_id。
例子:
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.utils.helpers import chain
dag = DAG(
"import_trx_table",
default_args=default_args,
schedule_interval="45 9 * * *",
)
task_list = []
for month_ds in month_lst_ds:
start = DummyOperator(task_id=f"dummy_start_trx_table_imports_{month_ds}", dag=dag)
task_list.append(start)
...
chain(*task_list)
Run Code Online (Sandbox Code Playgroud)
问题是 DAG 现在非常长(而且很慢)(因为我迭代了超过 12 个月)。其中一些任务肯定可以并行运行。
我试图检查链辅助函数是否有并行链接的方式,但找不到任何东西。
有什么建议/想法吗?
谢谢。
您可以使用chain
,但它在这里并没有真正提供价值。
from airflow import DAG
from airflow.operators.dummy import DummyOperator
from datetime import datetime
default_args = {
'owner': 'airflow',
'start_date': datetime(2021, 6, 7),
}
dag = DAG(
"stackoverflow_question",
default_args=default_args,
schedule_interval="@daily",
)
month_lst_ds = ['Dec', 'Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun', 'Jul', 'Aug', 'Sep', 'Oct', 'Nov']
start_op = DummyOperator(task_id='start_task', dag=dag)
end_op = DummyOperator(task_id='end_task', dag=dag)
for month_ds in month_lst_ds:
month_op = DummyOperator(task_id=f"dummy_start_trx_table_imports_{month_ds}", dag=dag)
start_op >> month_op >> end_op
Run Code Online (Sandbox Code Playgroud)
如果您愿意,可以更换
start_op >> month_op >> end_op
Run Code Online (Sandbox Code Playgroud)
和
chain(start_op, month_op, end_op )
Run Code Online (Sandbox Code Playgroud)
这是 DAG 结构在图形视图中的外观:
归档时间: |
|
查看次数: |
12267 次 |
最近记录: |