这实际上是两个问题合二为一。
我AIRFLOW_HOME的结构像
airflow
+-- dags
+-- plugins
+-- __init__.py
+-- hooks
+-- __init__.py
+-- my_hook.py
+-- another_hook.py
+-- operators
+-- __init__.py
+-- my_operator.py
+-- another_operator.py
+-- sensors
+-- utils
Run Code Online (Sandbox Code Playgroud)
我一直在关注 astronomer.io 的示例https://github.com/airflow-plugins。我的自定义operators使用我的自定义hooks,所有导入都是相对于顶级文件夹的plugins。
airflow
+-- dags
+-- plugins
+-- __init__.py
+-- hooks
+-- __init__.py
+-- my_hook.py
+-- another_hook.py
+-- operators
+-- __init__.py
+-- my_operator.py
+-- another_operator.py
+-- sensors
+-- utils
Run Code Online (Sandbox Code Playgroud)
但是,当我尝试将整个存储库移动到 plugins 文件夹时,运行后出现导入错误,airflow list_dags说plugins找不到。
我阅读了一些关于它的内容,显然 Airflow 将插件加载到其核心模块中,以便它们可以像这样导入
# my_operator.py
from plugins.hooks.my_hook import MyHook
Run Code Online (Sandbox Code Playgroud)
因此,我将所有导入更改为直接读取airflow.plugin_type。但是,我收到另一个导入错误,这次说my_hook找不到。我每次都重新启动我的工作人员、调度程序和网络服务器,但这似乎不是问题。我看过类似问题中提出的解决方案,但它们也不起作用。
官方文档也显示了这种扩展类的方式https://airflow.apache.org/plugins.htmlAirflowPlugin,但我不确定这个“接口”应该在哪里。我也更喜欢拖放选项。
最后,将我的代码存储库作为plugins文件夹本身显然没有意义,但是如果我将它们分开,测试就会变得不方便。每次在钩子/操作上运行单元测试时,是否都必须修改我的气流配置以指向我的存储库?测试自定义插件的最佳实践是什么?
abs*_*ted 17
我通过反复试验发现了这一点。这是我AIRFLOW_HOME文件夹的最终结构
airflow
+-- dags
+-- plugins
+-- __init__.py
+-- plugin_name.py
+-- hooks
+-- __init__.py
+-- my_hook.py
+-- another_hook.py
+-- operators
+-- __init__.py
+-- my_operator.py
+-- another_operator.py
+-- sensors
+-- utils
Run Code Online (Sandbox Code Playgroud)
在plugin_name.py,我扩展了AirflowPlugin类
airflow
+-- dags
+-- plugins
+-- __init__.py
+-- plugin_name.py
+-- hooks
+-- __init__.py
+-- my_hook.py
+-- another_hook.py
+-- operators
+-- __init__.py
+-- my_operator.py
+-- another_operator.py
+-- sensors
+-- utils
Run Code Online (Sandbox Code Playgroud)
在使用我的自定义钩子的自定义操作符中,我像这样导入它们
# plugin_name.py
from airflow.plugins_manager import AirflowPlugin
from hooks.my_hook import *
from operators.my_operator import *
from utils.my_utils import *
# etc
class PluginName(AirflowPlugin):
name = 'plugin_name'
hooks = [MyHook]
operators = [MyOperator]
macros = [my_util_func]
Run Code Online (Sandbox Code Playgroud)
然后在我的 DAG 文件中,我可以做
# my_operator.py
from hooks.my_hook import MyHook
Run Code Online (Sandbox Code Playgroud)
有必要重新启动网络服务器和调度程序。我花了一段时间才弄清楚。
这也有助于测试,因为自定义类中的导入是相对于文件夹中的子模块的plugins。我想知道我是否可以省略__init__.py里面的文件plugins,但由于一切正常,我没有尝试这样做。