气流在单个DAG中生成动态任务,任务N + 1取决于TaskN

use*_*397 4 python airflow

动态生成任务时,我需要让Task 2依赖于Task 1,Task1 >> Task 2或task2.set_upstream(task1)。

由于task_ids已评估,或者似乎是预先确定的,因此我无法提前设置依赖关系,将不胜感激。

Component(I)任务可以很好地运行,只不过它们一次运行即可。

for i in range(1,10):
  task_id='Component'+str(i)
  task_id = BashOperator(
  task_id='Component'+str(i),
  bash_command="echo  {{ ti.xcom_pull task_ids='SomeOtherTaskXcom', key='return_value') }} -z " + str(i) ,
  xcom_push=True,
  dag=dag) 
  ?????.set_upstream(??????)
Run Code Online (Sandbox Code Playgroud)

Vir*_*ekh 7

你可以遵循这样的模式:

with dag:

d1 = DummyOperator(task_id='kick_off_dag')

for i in range(0, 5):
    d2 = DummyOperator(task_id='generate_data_{0}'.format(i))
    d1 >> d2
Run Code Online (Sandbox Code Playgroud)

这将在 d1 下游生成 5 个任务。

  • 这可以在 d1 之后并行运行 5 个任务,我正在寻找的是让任务按顺序运行,如下所示:d1 >> generate_data_1 >> generate_data_2 >> generate_data_3 >> generate_data_4 >> generate_data_5 (3认同)

kax*_*xil 6

使用以下代码:

a = []
for i in range(0,10):
    a.append(BashOperator(
        task_id='Component'+str(i),
        bash_command="echo  {{ ti.xcom_pull task_ids='SomeOtherTaskXcom', key='return_value') }} -z " + str(i) ,
        xcom_push=True,
        dag=dag))
    if i not in [0]: 
        a[i-1] >> a[i]
Run Code Online (Sandbox Code Playgroud)

使用DummyOperator,代码如下所示:

a = []
for i in range(0,10):
    a.append(DummyOperator(
        task_id='Component'+str(i),
        dag=dag))
    if i not in [0]: 
        a[i-1] >> a[i]
Run Code Online (Sandbox Code Playgroud)

这将生成以下DAG:

在此处输入图片说明

  • 谢谢 !正是我所寻找的,如上所述,效果完美。要让动态步骤在另一个步骤之后执行,只需添加 [0].set_upstream(somePreviousTask) (3认同)