我有一个简单的 dag,它使用分支运算符来检查 y 是否为 False。如果是,则该 dag 应该转移到 say_goodbye 任务组。如果为 True,则跳过并转到 finish_dag_step。这是达格:
def which_step() -> str:
y = False
if not y:
return 'say_goodbye'
else:
return 'finish_dag_step'
with DAG(
'my_test_dag',
start_date = datetime(2022, 5, 14),
schedule_interval = '0 0 * * *',
catchup = True) as dag:
say_hello = BashOperator(
task_id = 'say_hello',
retries = 3,
bash_command = 'echo "hello world"'
)
run_which_step = BranchPythonOperator(
task_id = 'run_which_step',
python_callable = which_step,
retries = 3,
retry_exponential_backoff = True,
retry_delay = timedelta(seconds = 5)
)
with TaskGroup('say_goodbye') as say_goodbye:
for i in range(0,2):
step = BashOperator(
task_id = 'step_' + str(i),
retries = 3,
bash_command = 'echo "goodbye world"'
)
step
finish_dag_step = BashOperator(
task_id = 'finish_dag_step',
retries = 3,
bash_command = 'echo "dag is finished"'
)
say_hello >> run_which_step
run_which_step >> say_goodbye >> finish_dag_step
run_which_step >> finish_dag_step
finish_dag_step
Run Code Online (Sandbox Code Playgroud)
当 dag 命中 run_which_step 时,我收到以下错误:
我不明白是什么原因造成的。到底是怎么回事?
您无法创建任务对任务组的依赖关系。因此,您必须通过 来引用任务task_id,这是任务组的名称和任务的 ID,由点 ( task_group.task_id) 连接起来。
你的分支函数应该返回类似的内容
def branch():
if condition:
return [f'task_group.task_{i}' for i in range(0,2)]
return 'default'
Run Code Online (Sandbox Code Playgroud)
但最简单的方法可能不是以这种方式返回任务 id 列表,而是将 DummyOperator 放在 TaskGroup 的上游。它可以有效地充当整个团队的切入点。