在 Airflow 中编写和导入自定义插件

abs*_*ted 4 airflow

这实际上是两个问题合二为一。

AIRFLOW_HOME的结构像

airflow
+-- dags
+-- plugins
    +-- __init__.py
    +-- hooks
        +-- __init__.py
        +-- my_hook.py
        +-- another_hook.py
    +-- operators
        +-- __init__.py
        +-- my_operator.py
        +-- another_operator.py
    +-- sensors
    +-- utils
Run Code Online (Sandbox Code Playgroud)

我一直在关注 astronomer.io 的示例https://github.com/airflow-plugins。我的自定义operators使用我的自定义hooks,所有导入都是相对于顶级文件夹的plugins

airflow
+-- dags
+-- plugins
    +-- __init__.py
    +-- hooks
        +-- __init__.py
        +-- my_hook.py
        +-- another_hook.py
    +-- operators
        +-- __init__.py
        +-- my_operator.py
        +-- another_operator.py
    +-- sensors
    +-- utils
Run Code Online (Sandbox Code Playgroud)

但是,当我尝试将整个存储库移动到 plugins 文件夹时,运行后出现导入错误,airflow list_dagsplugins找不到。

我阅读了一些关于它的内容,显然 Airflow 将插件加载到其核心模块中,以便它们可以像这样导入

# my_operator.py
from plugins.hooks.my_hook import MyHook
Run Code Online (Sandbox Code Playgroud)

因此,我将所有导入更改为直接读取airflow.plugin_type。但是,我收到另一个导入错误,这次说my_hook找不到。我每次都重新启动我的工作人员、调度程序和网络服务器,但这似乎不是问题。我看过类似问题中提出的解决方案,但它们也不起作用。

官方文档也显示了这种扩展类的方式https://airflow.apache.org/plugins.htmlAirflowPlugin,但我不确定这个“接口”应该在哪里。我也更喜欢拖放选项。

最后,将我的代码存储库作为plugins文件夹本身显然没有意义,但是如果我将它们分开,测试就会变得不方便。每次在钩子/操作上运行单元测试时,是否都必须修改我的气流配置以指向我的存储库?测试自定义插件的最佳实践是什么?

abs*_*ted 17

我通过反复试验发现了这一点。这是我AIRFLOW_HOME文件夹的最终结构

airflow 
+-- dags 
+-- plugins
    +-- __init__.py
    +-- plugin_name.py
    +-- hooks
        +-- __init__.py
        +-- my_hook.py 
        +-- another_hook.py 
    +-- operators
        +-- __init__.py
        +-- my_operator.py 
        +-- another_operator.py 
    +-- sensors 
    +-- utils
Run Code Online (Sandbox Code Playgroud)

plugin_name.py,我扩展了AirflowPlugin

airflow 
+-- dags 
+-- plugins
    +-- __init__.py
    +-- plugin_name.py
    +-- hooks
        +-- __init__.py
        +-- my_hook.py 
        +-- another_hook.py 
    +-- operators
        +-- __init__.py
        +-- my_operator.py 
        +-- another_operator.py 
    +-- sensors 
    +-- utils
Run Code Online (Sandbox Code Playgroud)

在使用我的自定义钩子的自定义操作符中,我像这样导入它们

# plugin_name.py

from airflow.plugins_manager import AirflowPlugin
from hooks.my_hook import *
from operators.my_operator import *
from utils.my_utils import *
# etc

class PluginName(AirflowPlugin):

    name = 'plugin_name'

    hooks = [MyHook]
    operators = [MyOperator]
    macros = [my_util_func]
Run Code Online (Sandbox Code Playgroud)

然后在我的 DAG 文件中,我可以做

# my_operator.py

from hooks.my_hook import MyHook
Run Code Online (Sandbox Code Playgroud)

有必要重新启动网络服务器和调度程序。我花了一段时间才弄清楚。

这也有助于测试,因为自定义类中的导入是相对于文件夹中的子模块的plugins。我想知道我是否可以省略__init__.py里面的文件plugins,但由于一切正常,我没有尝试这样做。