无法导入Airflow插件

Chr*_*son 23 airflow

继气流教程这里.

问题:Web服务器返回以下错误

Broken DAG: [/usr/local/airflow/dags/test_operator.py] cannot import name 
MyFirstOperator
Run Code Online (Sandbox Code Playgroud)

注意: 目录结构如下所示:

airflow_home
??? airflow.cfg
??? airflow.db
??? dags
?   ??? test_operators.py  
??? plugins
?   ??? my_operators.py   
??? unittests.cfg
Run Code Online (Sandbox Code Playgroud)

我试图在'test_operators.py'中导入插件,如下所示:

from airflow.operators import MyFirstOperator
Run Code Online (Sandbox Code Playgroud)

代码与教程中的代码完全相同.

Bjo*_*orn 15

在与 Airflow 文档苦苦挣扎并在这里尝试了一些答案但没有成功之后,我从 astronomer.io找到了这种方法

正如他们指出的那样,构建 Airflow 插件可能会令人困惑,而且可能不是添加钩子和操作符的最佳方式。

自定义挂钩和运算符是扩展 Airflow 以满足您的需求的强大方法。然而,对于实现它们的最佳方式存在一些混淆。根据 Airflow 文档,可以使用 Airflow 的插件机制添加它们。然而,这使问题过于复杂,并导致许多人感到困惑。Airflow 甚至考虑弃用插件机制来处理钩子和操作符。

因此,我没有使用插件 API,而是按照 Astronomer 的方法设置 Airflow,如下所示。

dags
??? my_dag.py               (contains dag and tasks)
plugins
??? __init__.py
??? hooks
?   ??? __init__.py
?   ??? mytest_hook.py      (contains class MyTestHook)
??? operators
    ??? __init__.py
    ??? mytest_operator.py  (contains class MyTestOperator)
Run Code Online (Sandbox Code Playgroud)

使用这种方法,我的操作符和钩子的所有代码都完全存在于它们各自的文件中 - 并且没有令人困惑的插件文件。所有__init__.py文件都是空的(与将插件代码放入其中一些同样令人困惑的方法不同)。

对于import需要的s,请考虑 Airflow 实际上如何使用 plugins 目录:

当 Airflow 运行时,它会将 dags/、plugins/ 和 config/ 添加到 PATH

这意味着这样做from airflow.operators.mytest_operator import MyTestOperator可能不会奏效。取而代之的from operators.mytest_operator import MyTestOperator是要走的路(注意from directory/file.py import Class上面设置中的对齐方式)。

我的文件中的工作片段如下所示。

my_dag.py:

from airflow import DAG
from operators.mytest_operator import MyTestOperator
default_args = {....}
dag = DAG(....)
....
mytask = MyTestOperator(task_id='MyTest Task', dag=dag)
....
Run Code Online (Sandbox Code Playgroud)

my_operator.py:

from airflow.models import BaseOperator
from hooks.mytest_hook import MyTestHook

class MyTestOperator(BaseOperator):
    ....
    hook = MyTestHook(....)
    ....
Run Code Online (Sandbox Code Playgroud)

my_hook.py:

class MyTestHook():
    ....
Run Code Online (Sandbox Code Playgroud)

这对我有用,并且比尝试子类化 AirflowPlugin 要简单得多。但是,如果您想更改网络服务器 UI,它可能不适合您:

注意:插件机制仍然必须用于对网络服务器 UI 进行更改的插件。

顺便说一句,我在此之前遇到的错误(现已解决):

ModuleNotFoundError: No module named 'mytest_plugin.hooks.mytest_hook'
ModuleNotFoundError: No module named 'operators.mytest_plugin'
Run Code Online (Sandbox Code Playgroud)

  • 我认为这是最好的方法。调度程序和/或网络服务器是否需要重新启动?我没有看到 astronomer.io 文章中提到的? (2认同)

smb*_*aei 9

Airflow 版本 2引入了一种新的插件管理机制,如其官方文档所述:

版本 2.0 中的更改:不再支持通过气流导入插件中添加的运算符、传感器、钩子。{operators,sensors, hooks}.<plugin_name> 不再支持,这些扩展应该作为常规 python 模块导入。有关更多信息,请参阅:模块管理和创建自定义运算符

管理 Python 代码所需要做的就是将代码放入plugins文件夹中,然后从此时开始寻址文件。假设您已在位于 path 的文件中编写了TestClass,在 dag 文件中您可以通过以下方式导入它:test.py$AIRFLOW_HOME/plugins/t_plugin/operators/test.py

from t_plugin.operators.test import TestClass
Run Code Online (Sandbox Code Playgroud)


jhn*_*lvr 8

在文章中它是这样的:

class MyFirstPlugin(AirflowPlugin):
    name = "my_first_plugin"
    operators = [MyFirstOperator]
Run Code Online (Sandbox Code Playgroud)

而是使用:

class MyFirstPlugin(AirflowPlugin):
    name = "my_first_plugin"
    operators = [MyFirstOperator]
    # A list of class(es) derived from BaseHook
    hooks = []
    # A list of class(es) derived from BaseExecutor
    executors = []
    # A list of references to inject into the macros namespace
    macros = []
    # A list of objects created from a class derived
    # from flask_admin.BaseView
    admin_views = []
    # A list of Blueprint object created from flask.Blueprint
    flask_blueprints = []
    # A list of menu links (flask_admin.base.MenuLink)
    menu_links = []
Run Code Online (Sandbox Code Playgroud)

也不要使用:

from airflow.operators import MyFirstOperator
Run Code Online (Sandbox Code Playgroud)

根据关于插件的气流文章,它应该是:

from airflow.operators.my_first_plugin import MyFirstOperator
Run Code Online (Sandbox Code Playgroud)

如果这不起作用,请尝试:

from airflow.operators.my_operators import MyFirstOperator
Run Code Online (Sandbox Code Playgroud)

如果这不起作用,请在启动时检查您的 Web 服务器登录以获取更多信息。

  • 谢谢,我已经试过了 - 在导入时,它会引发“没有名为“my_first_plugin”、“my_operators”的模块。 (4认同)
  • 对于 1.8,您可以在 [源代码](https://github.com/apache/incubator-airflow/blob/32a26d84b679a54add43092d0bdb77350dcbaeaf/airflow/operators/__init__.py#L102) 中找到此提示:Importing plugin operator ...直接来自 'airflow.operators' 已被弃用。请改为从 'airflow.operators.[plugin_module]' 导入。Airflow 2.0 中将完全取消对直接导入的支持。 (2认同)
  • AirflowPlugin 的子类的 name 属性将成为模块名称。例如,如果`name = "my_first_plugin"` 则在dag 中使用`fromairflow.operators.my_first_plugin import MyFirstOperator`。`my_first_plugin` 肯定行不通。正如@ChristophHösler 所提到的,旧方法`fromairflow.operators import MyFirstOperator` 有效,但会因污染命名空间而被删除。新方式:https://github.com/apache/incubator-airflow/blob/master/airflow/operators/__init__.py#L107 和旧方式 https://github.com/apache/incubator-airflow/blob/master /airflow/operators/__init__.py#L116 (2认同)
  • 截至今天,使用气流 1.10,格式“来自气流.operators import MyFirstOperator”已为我加载传感器。 (2认同)

小智 7

我使用气流1.10。如果您要导入的是自定义运算符,则可以将其上载到airflow plugins文件夹,然后在DAG中将导入指定为:

[文件名] 导入 [类名]

其中:filename是您的插件文件的名称classname是您的类的名称。

例如:如果文件名是my_first_plugin,而类名是MyFirstOperator ,则导入将是:

my_first_plugin导入MyFirstOperator

在使用气流1.10时为我工作

谢谢 !希望这可以帮助 !!

  • 虽然这有效并且显然更简单,但我想知道为什么 Airflow 推荐插件机制,即有一个带有 `class MyPlugin(AirflowPlugin): name = 'my_first_plugin' Operators = [MyFirstOperator]` 的 `plugins/__init__.py` 唯一的“优势” “我看到的是,然后您可以将插件导入为 `from airflow.operators.my_first_plugin import MyFirstOperator` (2认同)

Chr*_*son 6

我重新启动了网络服务器,现在一切正常。

以下是我认为可能发生的情况:

  1. 在开始使用教程示例之前,我尝试运行自己的插件和 dag。第一次运行时我修复了一个小语法错误,但是在修复后我开始收到“无法导入名称”错误。
  2. 我删除了插件和 dag,并尝试使用教程中的插件来查看发生了什么。

我的猜测是第 1 步的错误以某种方式影响了第 2 步。

  • 根据我的经验,当您添加/修改任何插件时,您需要重新启动网络服务器。 (10认同)