Airflow 2.0 - 调度程序无法在 serialized_dag 表中找到序列化的 DAG

Div*_*aar 2 airflow airflow-scheduler

我在 dags 目录中有 2 个文件 - dag_1.py 和 dag_2.py

dag_1.py 创建静态 DAG,dag_2.py 基于某个位置的外部 json 文件创建动态 DAG。

静态 DAG(由 dag_1.py 创建)包含稍后阶段的任务,该任务为 dag_2.py 生成其中一些输入 json 文件,并以这种方式创建动态 DAG。

这曾经适用于未使用 DAG 序列化的 Airflow 1.x 版本。但随着 Airflow 2.0 DAG 序列化已成为强制性要求。有时,当生成动态 DAG 时,我会在调度程序中收到以下异常 -

[2021-01-02 06:17:39,493] {scheduler_job.py:1293} ERROR - Exception when executing SchedulerJob._run_scheduler_loop
Traceback (most recent call last):
  File "/global/packages/python/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 1275, in _execute
    self._run_scheduler_loop()
  File "/global/packages/python/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 1377, in _run_scheduler_loop
    num_queued_tis = self._do_scheduling(session)
  File "/global/packages/python/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 1474, in _do_scheduling
    self._create_dag_runs(query.all(), session)
  File "/global/packages/python/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 1557, in _create_dag_runs
    dag = self.dagbag.get_dag(dag_model.dag_id, session=session)
  File "/global/packages/python/lib/python3.7/site-packages/airflow/utils/session.py", line 62, in wrapper
    return func(*args, **kwargs)
  File "/global/packages/python/lib/python3.7/site-packages/airflow/models/dagbag.py", line 171, in get_dag
    self._add_dag_from_db(dag_id=dag_id, session=session)
  File "/global/packages/python/lib/python3.7/site-packages/airflow/models/dagbag.py", line 227, in _add_dag_from_db
    raise SerializedDagNotFound(f"DAG '{dag_id}' not found in serialized_dag table")
airflow.exceptions.SerializedDagNotFound: DAG 'dynamic_dag_1' not found in serialized_dag table
Run Code Online (Sandbox Code Playgroud)

在此之后,调度程序将终止,这是预期的。当我在此错误后手动检查表格时,我能够看到其中的 DAG 条目。

这个问题不是一直可以重现的。这可能是什么原因?是否有我应该尝试调整的气流配置?

Geo*_*rge 5

按以下顺序更新后,我们遇到了同样的问题:

  1. 1.10.12 -> 1.10.14
  2. 1.10.14 -> 2.0.0

我已经按照他们的指南进行了操作,直到在几个小时之后的某个随机点,调度程序开始崩溃,抱怨数据库中找不到随机 DAG,我们才遇到问题。

我们的部署过程涉及/opt/airflow/dags每次清除文件夹并进行全新安装(我们将 dag 和支持代码存储在 python 包中)

因此,在 1.10.x 版本中,我们时不时会遇到调度程序解析空文件夹并从数据库中擦除序列化 dag 的情况,但它始终能够在下次解析时恢复图片

显然在 2.0 中,作为使调度程序 HA 努力的一部分,他们将 DAG 处理器和调度程序完全分离。这导致了竞争条件:

  • 如果调度程序作业DAG 处理器更新serialized_dag表值之前命中数据库,则它什么也找不到并崩溃
  • 如果运气在你这一边,上述情况不会发生,你也不会看到这个异常

为了摆脱这个问题,我通过is_paused在数据库中更新来禁用所有 DAG 的调度,重新启动调度程序,一旦它生成了序列化的 dag,就把所有的 dag 重新打开