han*_*kuk 1 python directed-acyclic-graphs airflow
我正在尝试创建一个 DAG,它将根据前一个任务的结果生成 N 个任务。问题是我无法在 Operator 之外使用上一个任务(在 XCom 中)返回的值
有办法让这项工作发挥作用吗?
with DAG(
"spawn_dag",
start_date=datetime(2022, 1, 1)
) as dag:
# Calculates the number of tasks based on some previous task run
count_number_of_tasks = PythonOperator(
task_id='count_number_of_tasks',
python_callable=count_tasks_function,
dag=dag,
xcom_push=True,
provide_context=True
)
# Generates tasks and chains them
def dynamic_spawn_func(parent_dag_name, child_dag_name, start_date, args, **kwargs):
subdag = DAG(
dag_id=f"{parent_dag_name}.{child_dag_name}",
default_args=args,
start_date=start_date,
schedule_interval=None
)
# Here is the problem, the following variable cannot be used in a loop to spawn tasks
number_of_tasks = kwargs['ti'].xcom_pull(dag_id='spawn_dag', task_ids='count_number_of_tasks')
# This is where that variable is used
for j in range(number_of_tasks):
task = PythonOperator(
task_id='processor_' + str(j),
python_callable=some_func,
op_kwargs={"val": j},
dag=subdag,
provide_context=True)
task_2 = PythonOperator(
task_id='wait_for_processor_' + str(j),
python_callable=some_func,
op_kwargs={"val": j},
dag=subdag,
provide_context=True)
task >> task_2
return subdag
dynamic_spawn_op = SubDagOperator(
task_id='dynamic_spawn',
subdag=dynamic_spawn_func("spawn_dag", "dynamic_spawn", dag.start_date, args=default_args),
dag=dag,
provide_context=True
)
generate >> count_number_of_tasks >> dynamic_spawn_op
Run Code Online (Sandbox Code Playgroud)
否。迁移到 Airflow 2.3+。Airlfow 1.10 已经停产两年了,不升级就是搬起石头砸自己的脚。您不仅缺乏新功能(例如动态任务映射),而且还使自己非常容易受到潜在安全问题的影响(自 1.10 以来已修复了 10 个 CVE),而且您还把自己置于这样的境地:
因为您是世界上最后运行 Airflow 1.10 的人之一。
在这个阶段不升级是非常错误的决定,因为不升级的成本比迁移成本高得多。多了好几倍。