我正在尝试创建一个 DAG,它将根据前一个任务的结果生成 N 个任务。问题是我无法在 Operator 之外使用上一个任务(在 XCom 中)返回的值
有办法让这项工作发挥作用吗?
with DAG(
"spawn_dag",
start_date=datetime(2022, 1, 1)
) as dag:
# Calculates the number of tasks based on some previous task run
count_number_of_tasks = PythonOperator(
task_id='count_number_of_tasks',
python_callable=count_tasks_function,
dag=dag,
xcom_push=True,
provide_context=True
)
# Generates tasks and chains them
def dynamic_spawn_func(parent_dag_name, child_dag_name, start_date, args, **kwargs):
subdag = DAG(
dag_id=f"{parent_dag_name}.{child_dag_name}",
default_args=args,
start_date=start_date,
schedule_interval=None
)
# Here is the problem, the following variable cannot be used in a loop to spawn tasks
number_of_tasks = kwargs['ti'].xcom_pull(dag_id='spawn_dag', task_ids='count_number_of_tasks') …Run Code Online (Sandbox Code Playgroud)