之前我使用以下代码片段来动态生成任务:
dummy_start_task = PythonOperator(
task_id="dummy_start",
default_args=default_args,
python_callable=dummy_start,
dag=dag
)
make_images_tasks = list()
for n in range(WORKERS):
globals()[f"make_images_{n}_task"] = PythonOperator(
task_id=f'make_images_{n}',
default_args=default_args,
python_callable=make_images,
op_kwargs={"n": n},
dag=dag
)
make_images_tasks.append(globals()[f"make_images_{n}_task"])
dummy_collector_task = PythonOperator(
task_id="dummy_collector",
default_args=default_args,
python_callable=dummy_collector,
dag=dag
)
dummy_start_task >> make_images_tasks >> dummy_collector_task
# in collector_task I would use:
# items = task_instance.xcom_pull(task_ids=[f"make_images_{n}" for n in range(int(WORKERS))])
# to get the XCOMs from the these dynamically generated tasks
Run Code Online (Sandbox Code Playgroud)
如何使用 TaskFlow API 实现这一目标?(生成多个任务,然后在以下收集器任务中获取它们的 XCom)
这是一个例子:
from datetime import datetime
from airflow import DAG
from airflow.decorators import task
with DAG(dag_id="example_taskflow", start_date=datetime(2022, 1, 1), schedule_interval=None) as dag:
@task
def dummy_start_task():
pass
tasks = []
for n in range(3):
@task(task_id=f"make_images_{n}")
def images_task(i):
return i
tasks.append(images_task(n))
@task
def dummy_collector_task(tasks):
print(tasks)
dummy_start_task_ = dummy_start_task()
dummy_start_task_ >> tasks
dummy_collector_task(tasks)
Run Code Online (Sandbox Code Playgroud)
这些make_images_*任务将 0、1 和 2 作为输入(并且也在任务的 id 中使用它)并返回值。获取任务dummy_collector_task的所有输出make_images_*并打印[0, 1, 2]。
| 归档时间: |
|
| 查看次数: |
1706 次 |
| 最近记录: |