小编han*_*kuk的帖子

Airflow 1.10.15 动态任务创建

我正在尝试创建一个 DAG,它将根据前一个任务的结果生成 N 个任务。问题是我无法在 Operator 之外使用上一个任务(在 XCom 中)返回的值

有办法让这项工作发挥作用吗?

with DAG(
        "spawn_dag",
         start_date=datetime(2022, 1, 1)
    ) as dag:
    
    # Calculates the number of tasks based on some previous task run
    count_number_of_tasks = PythonOperator(
        task_id='count_number_of_tasks',
        python_callable=count_tasks_function,
        dag=dag,
        xcom_push=True,
        provide_context=True
    )

    # Generates tasks and chains them
    def dynamic_spawn_func(parent_dag_name, child_dag_name, start_date, args, **kwargs):
        subdag = DAG(
            dag_id=f"{parent_dag_name}.{child_dag_name}",
            default_args=args,
            start_date=start_date,
            schedule_interval=None
        )

        # Here is the problem, the following variable cannot be used in a loop to spawn tasks
        number_of_tasks = kwargs['ti'].xcom_pull(dag_id='spawn_dag', task_ids='count_number_of_tasks') …
Run Code Online (Sandbox Code Playgroud)

python directed-acyclic-graphs airflow

1
推荐指数
1
解决办法
222
查看次数

标签 统计

airflow ×1

directed-acyclic-graphs ×1

python ×1