Airflow Branch 操作员和任务组任务 ID 无效

fjj*_*s88 4 python airflow

我有一个简单的 dag,它使用分支运算符来检查 y 是否为 False。如果是,则该 dag 应该转移到 say_goodbye 任务组。如果为 True,则跳过并转到 finish_dag_step。这是达格:

def which_step() -> str:
  y = False
  if not y:
      return 'say_goodbye'
  else:
      return 'finish_dag_step'

with DAG(
  'my_test_dag',
  start_date = datetime(2022, 5, 14),
  schedule_interval = '0 0 * * *',
  catchup = True) as dag:

say_hello = BashOperator(
    task_id = 'say_hello',
    retries = 3,
    bash_command = 'echo "hello world"'
)

run_which_step = BranchPythonOperator(
    task_id = 'run_which_step',
    python_callable = which_step,
    retries = 3,
    retry_exponential_backoff = True,
    retry_delay = timedelta(seconds = 5)
)

with TaskGroup('say_goodbye') as say_goodbye:
    for i in range(0,2):
        step = BashOperator(
            task_id = 'step_' + str(i),
            retries = 3,
            bash_command = 'echo "goodbye world"'
            )

        step

finish_dag_step = BashOperator(
    task_id = 'finish_dag_step',
    retries = 3,
    bash_command = 'echo "dag is finished"'
)
say_hello >> run_which_step
run_which_step >> say_goodbye >> finish_dag_step
run_which_step >> finish_dag_step
finish_dag_step
Run Code Online (Sandbox Code Playgroud)

当 dag 命中 run_which_step 时,我收到以下错误:

在此输入图像描述 在此输入图像描述

我不明白是什么原因造成的。到底是怎么回事?

EDG*_*956 6

您无法创建任务对任务组的依赖关系。因此,您必须通过 来引用任务task_id,这是任务组的名称和任务的 ID,由点 ( task_group.task_id) 连接起来。

你的分支函数应该返回类似的内容

def branch():
    if condition:
        return [f'task_group.task_{i}' for i in range(0,2)]
    return 'default'
Run Code Online (Sandbox Code Playgroud)

但最简单的方法可能不是以这种方式返回任务 id 列表,而是将 DummyOperator 放在 TaskGroup 的上游。它可以有效地充当整个团队的切入点。