我正在学习 Airflow,我查看了 Airflow 附带的示例 DAG 之一(example_branch_python_dop_operator_3.py)
在此示例中,如果(执行日期时间的)分钟是偶数,则 DAG 会分支到一个分支,如果分钟是奇数,则 DAG 会分支到另一个分支。此外,DAG 已depends_on_past
设置True
为所有任务的默认值。完整代码是:
args = {
'owner': 'Airflow',
'start_date': airflow.utils.dates.days_ago(2),
'depends_on_past': True,
}
# BranchPython operator that depends on past
# and where tasks may run or be skipped on
# alternating runs
dag = DAG(
dag_id='example_branch_dop_operator_v3',
schedule_interval='*/1 * * * *',
default_args=args,
)
def should_run(**kwargs):
print('------------- exec dttm = {} and minute = {}'.
format(kwargs['execution_date'], kwargs['execution_date'].minute))
if kwargs['execution_date'].minute % 2 == 0:
return "dummy_task_1"
else:
return "dummy_task_2"
cond = BranchPythonOperator(
task_id='condition',
provide_context=True,
python_callable=should_run,
dag=dag,
)
dummy_task_1 = DummyOperator(task_id='dummy_task_1', dag=dag)
dummy_task_2 = DummyOperator(task_id='dummy_task_2', dag=dag)
cond >> [dummy_task_1, dummy_task_2]
Run Code Online (Sandbox Code Playgroud)
我本来期望,因为depends_on_past
是 True,在第一次 DAG 运行之后,任务将不再能够启动。每个任务都会查看前一个任务的状态,并看到它是skipped
,这不是成功,并且基本上没有状态就挂起。
然而,事实并非如此。以下是树视图中的结果:
如您所见,所有选定的任务都在每次 DAG 运行中运行。为什么会这样?我误解了什么depends_on_past
意思吗?我以为每个任务在之前的 DAG 运行中查看具有相同 task_id 的任务的状态。
为了让它运行,我只是在主界面中打开了 DAG,所以我相信这些是预定的运行。
来自版本 Airflow 1.7.1 的变更日志,2016-05-19
- Treat SKIPPED and SUCCESS the same way when evaluating depends_on_past=True
Run Code Online (Sandbox Code Playgroud)
看起来这里检查了条件:
airflow/ti_deps/deps/prev_dagrun_dep.py (master brunch)
line 75: if previous_ti.state not in {State.SKIPPED, State.SUCCESS}:
Run Code Online (Sandbox Code Playgroud)
归档时间: |
|
查看次数: |
1173 次 |
最近记录: |