MWAA Airflow 2.2.2“DAG”对象没有属性“update_relative”

Sal*_*rid 7 python amazon-web-services airflow mwaa snowflake-connector

因此,我将 DAG 从 Airflow 版本 1.12.15 升级到 2.2.2,并将 python 从 3.8 降级到 3.7(因为 MWAA 不支持 python 3.8)。DAG 在之前的设置中工作正常,但在 MWAA 设置中显示此错误:

Broken DAG: [/usr/local/airflow/dags/google_analytics_import.py] Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 1474, in set_downstream
    self._set_relatives(task_or_task_list, upstream=False, edge_modifier=edge_modifier)
  File "/usr/local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 1412, in _set_relatives
    task_object.update_relative(self, not upstream)
AttributeError: 'DAG' object has no attribute 'update_relative'
Run Code Online (Sandbox Code Playgroud)

这是似乎失败的内置函数:


def set_downstream(
        self,
        task_or_task_list: Union[TaskMixin, Sequence[TaskMixin]],
        edge_modifier: Optional[EdgeModifier] = None,
    ) -> None:
        """
        Set a task or a task list to be directly downstream from the current
        task. Required by TaskMixin.
        """
        self._set_relatives(task_or_task_list, upstream=False, edge_modifier=edge_modifier)
Run Code Online (Sandbox Code Playgroud)

我们尝试在 DAG 中运行以下代码:

    for report in reports:
        dag << PythonOperator(
            task_id=f"task_{report}",
            python_callable=process,
            op_kwargs={
                "conn": "snowflake_production",
                "table": report,
            },
            provide_context=True,
        )
Run Code Online (Sandbox Code Playgroud)

我认为从 Python 3.8 到 3.7 的过渡导致了这个问题,但我不确定。

有人遇到过类似的问题吗?

Ela*_*lad 3

对于 Airflow>=2.0.0 不再支持使用按位移位(bit-shift)运算符将任务分配给 DAG。

尝试做:

dag = DAG("my_dag")
dummy = DummyOperator(task_id="dummy")

dag >> dummy
Run Code Online (Sandbox Code Playgroud)

不管用。

只能在操作员之间设置依赖关系。

您应该使用上下文管理器:

with DAG("my_dag") as dag:
    dummy = DummyOperator(task_id="dummy")
Run Code Online (Sandbox Code Playgroud)

它已经处理了运算符与 DAG 对象的关系。如果您不想这样做,则可以在运算符构造函数中使用 dag 参数,如下所示:DummyOperator(task_id="dummy", dag=dag)