动态生成任务时,我需要让Task 2依赖于Task 1,Task1 >> Task 2或task2.set_upstream(task1)。
由于task_ids已评估,或者似乎是预先确定的,因此我无法提前设置依赖关系,将不胜感激。
Component(I)任务可以很好地运行,只不过它们一次运行即可。
for i in range(1,10):
task_id='Component'+str(i)
task_id = BashOperator(
task_id='Component'+str(i),
bash_command="echo {{ ti.xcom_pull task_ids='SomeOtherTaskXcom', key='return_value') }} -z " + str(i) ,
xcom_push=True,
dag=dag)
?????.set_upstream(??????)
Run Code Online (Sandbox Code Playgroud)
你可以遵循这样的模式:
with dag:
d1 = DummyOperator(task_id='kick_off_dag')
for i in range(0, 5):
d2 = DummyOperator(task_id='generate_data_{0}'.format(i))
d1 >> d2
Run Code Online (Sandbox Code Playgroud)
这将在 d1 下游生成 5 个任务。
使用以下代码:
a = []
for i in range(0,10):
a.append(BashOperator(
task_id='Component'+str(i),
bash_command="echo {{ ti.xcom_pull task_ids='SomeOtherTaskXcom', key='return_value') }} -z " + str(i) ,
xcom_push=True,
dag=dag))
if i not in [0]:
a[i-1] >> a[i]
Run Code Online (Sandbox Code Playgroud)
使用DummyOperator,代码如下所示:
a = []
for i in range(0,10):
a.append(DummyOperator(
task_id='Component'+str(i),
dag=dag))
if i not in [0]:
a[i-1] >> a[i]
Run Code Online (Sandbox Code Playgroud)
这将生成以下DAG:
| 归档时间: |
|
| 查看次数: |
1256 次 |
| 最近记录: |