解释depends_on_past 功能

Ste*_*hen 7 airflow

我正在学习 Airflow,我查看了 Airflow 附带的示例 DAG 之一(example_branch_python_dop_operator_3.py)

在此示例中,如果(执行日期时间的)分钟是偶数,则 DAG 会分支到一个分支,如果分钟是奇数,则 DAG 会分支到另一个分支。此外,DAG 已depends_on_past设置True为所有任务的默认值。完整代码是:

args = {
    'owner': 'Airflow',
    'start_date': airflow.utils.dates.days_ago(2),
    'depends_on_past': True,
}

# BranchPython operator that depends on past
# and where tasks may run or be skipped on
# alternating runs
dag = DAG(
    dag_id='example_branch_dop_operator_v3',
    schedule_interval='*/1 * * * *',
    default_args=args,
)


def should_run(**kwargs):
    print('------------- exec dttm = {} and minute = {}'.
          format(kwargs['execution_date'], kwargs['execution_date'].minute))
    if kwargs['execution_date'].minute % 2 == 0:
        return "dummy_task_1"
    else:
        return "dummy_task_2"


cond = BranchPythonOperator(
    task_id='condition',
    provide_context=True,
    python_callable=should_run,
    dag=dag,
)

dummy_task_1 = DummyOperator(task_id='dummy_task_1', dag=dag)
dummy_task_2 = DummyOperator(task_id='dummy_task_2', dag=dag)
cond >> [dummy_task_1, dummy_task_2]
Run Code Online (Sandbox Code Playgroud)

我本来期望,因为depends_on_past是 True,在第一次 DAG 运行之后,任务将不再能够启动。每个任务都会查看前一个任务的状态,并看到它是skipped,这不是成功,并且基本上没有状态就挂起。

然而,事实并非如此。以下是树视图中的结果:

在此处输入图片说明

如您所见,所有选定的任务都在每次 DAG 运行中运行。为什么会这样?我误解了什么depends_on_past意思吗?我以为每个任务在之前的 DAG 运行中查看具有相同 task_id 的任务的状态。

为了让它运行,我只是在主界面中打开了 DAG,所以我相信这些是预定的运行。

Ily*_*rov 5

来自版本 Airflow 1.7.1 的变更日志,2016-05-19

- Treat SKIPPED and SUCCESS the same way when evaluating depends_on_past=True
Run Code Online (Sandbox Code Playgroud)

看起来这里检查了条件:

airflow/ti_deps/deps/prev_dagrun_dep.py (master brunch)

line 75: if previous_ti.state not in {State.SKIPPED, State.SUCCESS}:
Run Code Online (Sandbox Code Playgroud)