继气流教程这里.
问题:Web服务器返回以下错误
Broken DAG: [/usr/local/airflow/dags/test_operator.py] cannot import name
MyFirstOperator
Run Code Online (Sandbox Code Playgroud)
注意: 目录结构如下所示:
airflow_home
??? airflow.cfg
??? airflow.db
??? dags
? ??? test_operators.py
??? plugins
? ??? my_operators.py
??? unittests.cfg
Run Code Online (Sandbox Code Playgroud)
我试图在'test_operators.py'中导入插件,如下所示:
from airflow.operators import MyFirstOperator
Run Code Online (Sandbox Code Playgroud)
代码与教程中的代码完全相同.
Bjo*_*orn 15
在与 Airflow 文档苦苦挣扎并在这里尝试了一些答案但没有成功之后,我从 astronomer.io找到了这种方法。
正如他们指出的那样,构建 Airflow 插件可能会令人困惑,而且可能不是添加钩子和操作符的最佳方式。
自定义挂钩和运算符是扩展 Airflow 以满足您的需求的强大方法。然而,对于实现它们的最佳方式存在一些混淆。根据 Airflow 文档,可以使用 Airflow 的插件机制添加它们。然而,这使问题过于复杂,并导致许多人感到困惑。Airflow 甚至考虑弃用插件机制来处理钩子和操作符。
因此,我没有使用插件 API,而是按照 Astronomer 的方法设置 Airflow,如下所示。
dags
??? my_dag.py (contains dag and tasks)
plugins
??? __init__.py
??? hooks
? ??? __init__.py
? ??? mytest_hook.py (contains class MyTestHook)
??? operators
??? __init__.py
??? mytest_operator.py (contains class MyTestOperator)
Run Code Online (Sandbox Code Playgroud)
使用这种方法,我的操作符和钩子的所有代码都完全存在于它们各自的文件中 - 并且没有令人困惑的插件文件。所有__init__.py文件都是空的(与将插件代码放入其中一些同样令人困惑的方法不同)。
对于import需要的s,请考虑 Airflow 实际上如何使用 plugins 目录:
当 Airflow 运行时,它会将 dags/、plugins/ 和 config/ 添加到 PATH
这意味着这样做from airflow.operators.mytest_operator import MyTestOperator可能不会奏效。取而代之的from operators.mytest_operator import MyTestOperator是要走的路(注意from directory/file.py import Class上面设置中的对齐方式)。
我的文件中的工作片段如下所示。
my_dag.py:
from airflow import DAG
from operators.mytest_operator import MyTestOperator
default_args = {....}
dag = DAG(....)
....
mytask = MyTestOperator(task_id='MyTest Task', dag=dag)
....
Run Code Online (Sandbox Code Playgroud)
my_operator.py:
from airflow.models import BaseOperator
from hooks.mytest_hook import MyTestHook
class MyTestOperator(BaseOperator):
....
hook = MyTestHook(....)
....
Run Code Online (Sandbox Code Playgroud)
my_hook.py:
class MyTestHook():
....
Run Code Online (Sandbox Code Playgroud)
这对我有用,并且比尝试子类化 AirflowPlugin 要简单得多。但是,如果您想更改网络服务器 UI,它可能不适合您:
注意:插件机制仍然必须用于对网络服务器 UI 进行更改的插件。
顺便说一句,我在此之前遇到的错误(现已解决):
ModuleNotFoundError: No module named 'mytest_plugin.hooks.mytest_hook'
ModuleNotFoundError: No module named 'operators.mytest_plugin'
Run Code Online (Sandbox Code Playgroud)
Airflow 版本 2引入了一种新的插件管理机制,如其官方文档所述:
版本 2.0 中的更改:不再支持通过气流导入插件中添加的运算符、传感器、钩子。{operators,sensors, hooks}.<plugin_name> 不再支持,这些扩展应该作为常规 python 模块导入。有关更多信息,请参阅:模块管理和创建自定义运算符
管理 Python 代码所需要做的就是将代码放入plugins文件夹中,然后从此时开始寻址文件。假设您已在位于 path 的文件中编写了TestClass,在 dag 文件中您可以通过以下方式导入它:test.py$AIRFLOW_HOME/plugins/t_plugin/operators/test.py
from t_plugin.operators.test import TestClass
Run Code Online (Sandbox Code Playgroud)
在文章中它是这样的:
class MyFirstPlugin(AirflowPlugin):
name = "my_first_plugin"
operators = [MyFirstOperator]
Run Code Online (Sandbox Code Playgroud)
而是使用:
class MyFirstPlugin(AirflowPlugin):
name = "my_first_plugin"
operators = [MyFirstOperator]
# A list of class(es) derived from BaseHook
hooks = []
# A list of class(es) derived from BaseExecutor
executors = []
# A list of references to inject into the macros namespace
macros = []
# A list of objects created from a class derived
# from flask_admin.BaseView
admin_views = []
# A list of Blueprint object created from flask.Blueprint
flask_blueprints = []
# A list of menu links (flask_admin.base.MenuLink)
menu_links = []
Run Code Online (Sandbox Code Playgroud)
也不要使用:
from airflow.operators import MyFirstOperator
Run Code Online (Sandbox Code Playgroud)
from airflow.operators.my_first_plugin import MyFirstOperator
Run Code Online (Sandbox Code Playgroud)
如果这不起作用,请尝试:
from airflow.operators.my_operators import MyFirstOperator
Run Code Online (Sandbox Code Playgroud)
如果这不起作用,请在启动时检查您的 Web 服务器登录以获取更多信息。
小智 7
我使用气流1.10。如果您要导入的是自定义运算符,则可以将其上载到airflow plugins文件夹,然后在DAG中将导入指定为:
从 [文件名] 导入 [类名]
其中:filename是您的插件文件的名称classname是您的类的名称。
例如:如果文件名是my_first_plugin,而类名是MyFirstOperator ,则导入将是:
从my_first_plugin导入MyFirstOperator
在使用气流1.10时为我工作
谢谢 !希望这可以帮助 !!
我重新启动了网络服务器,现在一切正常。
以下是我认为可能发生的情况:
我的猜测是第 1 步的错误以某种方式影响了第 2 步。
| 归档时间: |
|
| 查看次数: |
16179 次 |
| 最近记录: |