如何在不同的DAG中使用通用脚本?

Est*_*rra 4 python airflow

我正在尝试在许多 DAG 中导入脚本以多次调用相同的操作。应用这种解决方案的最佳方法是什么?

现在我的文件夹结构如下:

dags/
|-- some_dags_folder/
|---- some_dag.py
|-- other_dags_folder/
|---- another_dag.py
|-- utils/
|---- util_slack.py
Run Code Online (Sandbox Code Playgroud)

当我尝试导入util_slack文件时,我将以下代码放入 DAG 代码中,例如假设代码来自some_dag.py

from ..utils.util_slack import some_function

将所有内容放入 Airflow 后,出现以下错误:

Broken DAG: [/usr/local/airflow/dags/some_dags_folder/some_dag.py] attempted relative import with no known parent package

util_slack脚本是一个用于发送成功消息或失败消息的文件,它看起来像这样

dags/
|-- some_dags_folder/
|---- some_dag.py
|-- other_dags_folder/
|---- another_dag.py
|-- utils/
|---- util_slack.py
Run Code Online (Sandbox Code Playgroud)

我的想法是,我可以将util_slack模块或任何其他自制模块导入到多个 DAG 中,并调用我需要的函数

from airflow.contrib.operators.slack_webhook_operator import SlackWebhookOperator
from airflow.hooks.base_hook import BaseHook

CHANNEL = BaseHook.get_connection('Slack').login
TOKEN = BaseHook.get_connection('Slack').password

def slack_success(context):
    ...
    alterHook = SlackWebhookOperator(...)
    return alterHook.execut(context=context)

def slack_fail(context):
    ...
    alterHook = SlackWebhookOperator(...)
    return alterHook.execut(context=context)
Run Code Online (Sandbox Code Playgroud)

这是最好的方法还是创建自定义插件(如https://airflow.apache.org/plugins.html中所示的插件)更好?

Dan*_*har 6

不确定plugins对于您的情况是否是一个好方法。Plugins将外部功能集成到Airflow核心(例如自定义端点、自定义登录/身份验证等)。

\n\n

下面是我的方法。目前我有很多与ClickHouse. 所以我需要在不同的DAG\'s. 结构示例:

\n\n
 dags\n    \xe2\x94\x9c\xe2\x94\x80\xe2\x94\x80 lib  # you can choose any your favorite name(utils, tools etc)\n    \xe2\x94\x82   \xe2\x94\x9c\xe2\x94\x80\xe2\x94\x80 ... just another common package / module\n    \xe2\x94\x82   \xe2\x94\x9c\xe2\x94\x80\xe2\x94\x80 default.py\n    \xe2\x94\x82   \xe2\x94\x9c\xe2\x94\x80\xe2\x94\x80 configurator.py\n    \xe2\x94\x82\xc2\xa0\xc2\xa0 \xe2\x94\x94\xe2\x94\x80\xe2\x94\x80 telegram.py\n    \xe2\x94\x94\xe2\x94\x80\xe2\x94\x80 # dag1, dag2...dag_n\n
Run Code Online (Sandbox Code Playgroud)\n\n

default.py - 只是默认 DAG 参数

\n\n
from lib.telegram import send_message\n\ndef on_success_callback(context):\n    pass\n\n\ndef on_failure_callback(context):\n    config = get_main_config()\n    if not config.get(\'NOTIFY_ON_FAILURE\'):\n        return\n    send_message(\'failed blabla\')\n\n\ndef get_main_config():\n    # I use variable with key \'MAIN_CONFIG\' to store some common settings for all dags\n    return Variable.get(\'MAIN_CONFIG\', deserialize_json=True)\n\n\ndef get_default_args():\n    return {\n        \'email_on_failure\': False,\n        \'email_on_retry\': False,\n        \'on_failure_callback\': on_failure_callback,\n        \'on_success_callback\': on_success_callback,\n        # etc...\n    }\n
Run Code Online (Sandbox Code Playgroud)\n\n

configurator.py - 所有必要的初始化都集中在一处。我使用注入,但您可以使用任何工具/方法这只是一个示例。

\n\n
from lib.default import get_main_config\nfrom airflow.hooks.base_hook import BaseHook\n\n\nclass InstancesPool:\n    def __init__(self, slack_connection, db_connection):\n        self._db_connection = db_connection\n        self._slack_connection = slack_connection\n\n    def get_slack_connection(self):\n        return self._slack_connection\n\n    def get_db_connection():\n        return self._db_connection\n\n\nclass DbConnection:\n    # just an example\n    def __init__(self, user, password):\n        pass\n\n\ndef configure():\n    config = get_main_config()\n\n    return InstancesPool(\n        BaseHook.get_connection(\'Slack\'),\n        DbConnection(config[\'DB_USER\'], config[\'DB_PASSWORD\'])\n    )\n
Run Code Online (Sandbox Code Playgroud)\n\n

这样您就不会遇到导入或初始化问题。您只需调用:

\n\n
from lib.configurator import configure\n\n\ndef my_task(ds, **kwargs):\n    pool = configure()\n    # pool.get_slack_connection() etc...\n
Run Code Online (Sandbox Code Playgroud)\n\n

希望这可以帮助。

\n