我正在尝试在许多 DAG 中导入脚本以多次调用相同的操作。应用这种解决方案的最佳方法是什么?
现在我的文件夹结构如下:
dags/
|-- some_dags_folder/
|---- some_dag.py
|-- other_dags_folder/
|---- another_dag.py
|-- utils/
|---- util_slack.py
Run Code Online (Sandbox Code Playgroud)
当我尝试导入util_slack文件时,我将以下代码放入 DAG 代码中,例如假设代码来自some_dag.py:
from ..utils.util_slack import some_function
将所有内容放入 Airflow 后,出现以下错误:
Broken DAG: [/usr/local/airflow/dags/some_dags_folder/some_dag.py] attempted relative import with no known parent package
该util_slack脚本是一个用于发送成功消息或失败消息的文件,它看起来像这样
dags/
|-- some_dags_folder/
|---- some_dag.py
|-- other_dags_folder/
|---- another_dag.py
|-- utils/
|---- util_slack.py
Run Code Online (Sandbox Code Playgroud)
我的想法是,我可以将util_slack模块或任何其他自制模块导入到多个 DAG 中,并调用我需要的函数
from airflow.contrib.operators.slack_webhook_operator import SlackWebhookOperator
from airflow.hooks.base_hook import BaseHook
CHANNEL = BaseHook.get_connection('Slack').login
TOKEN = BaseHook.get_connection('Slack').password
def slack_success(context):
...
alterHook = SlackWebhookOperator(...)
return alterHook.execut(context=context)
def slack_fail(context):
...
alterHook = SlackWebhookOperator(...)
return alterHook.execut(context=context)
Run Code Online (Sandbox Code Playgroud)
这是最好的方法还是创建自定义插件(如https://airflow.apache.org/plugins.html中所示的插件)更好?
不确定plugins对于您的情况是否是一个好方法。Plugins将外部功能集成到Airflow核心(例如自定义端点、自定义登录/身份验证等)。
下面是我的方法。目前我有很多与ClickHouse. 所以我需要在不同的DAG\'s. 结构示例:
dags\n \xe2\x94\x9c\xe2\x94\x80\xe2\x94\x80 lib # you can choose any your favorite name(utils, tools etc)\n \xe2\x94\x82 \xe2\x94\x9c\xe2\x94\x80\xe2\x94\x80 ... just another common package / module\n \xe2\x94\x82 \xe2\x94\x9c\xe2\x94\x80\xe2\x94\x80 default.py\n \xe2\x94\x82 \xe2\x94\x9c\xe2\x94\x80\xe2\x94\x80 configurator.py\n \xe2\x94\x82\xc2\xa0\xc2\xa0 \xe2\x94\x94\xe2\x94\x80\xe2\x94\x80 telegram.py\n \xe2\x94\x94\xe2\x94\x80\xe2\x94\x80 # dag1, dag2...dag_n\nRun Code Online (Sandbox Code Playgroud)\n\ndefault.py - 只是默认 DAG 参数
\n\nfrom lib.telegram import send_message\n\ndef on_success_callback(context):\n pass\n\n\ndef on_failure_callback(context):\n config = get_main_config()\n if not config.get(\'NOTIFY_ON_FAILURE\'):\n return\n send_message(\'failed blabla\')\n\n\ndef get_main_config():\n # I use variable with key \'MAIN_CONFIG\' to store some common settings for all dags\n return Variable.get(\'MAIN_CONFIG\', deserialize_json=True)\n\n\ndef get_default_args():\n return {\n \'email_on_failure\': False,\n \'email_on_retry\': False,\n \'on_failure_callback\': on_failure_callback,\n \'on_success_callback\': on_success_callback,\n # etc...\n }\nRun Code Online (Sandbox Code Playgroud)\n\nconfigurator.py - 所有必要的初始化都集中在一处。我使用注入,但您可以使用任何工具/方法这只是一个示例。
\n\nfrom lib.default import get_main_config\nfrom airflow.hooks.base_hook import BaseHook\n\n\nclass InstancesPool:\n def __init__(self, slack_connection, db_connection):\n self._db_connection = db_connection\n self._slack_connection = slack_connection\n\n def get_slack_connection(self):\n return self._slack_connection\n\n def get_db_connection():\n return self._db_connection\n\n\nclass DbConnection:\n # just an example\n def __init__(self, user, password):\n pass\n\n\ndef configure():\n config = get_main_config()\n\n return InstancesPool(\n BaseHook.get_connection(\'Slack\'),\n DbConnection(config[\'DB_USER\'], config[\'DB_PASSWORD\'])\n )\nRun Code Online (Sandbox Code Playgroud)\n\n这样您就不会遇到导入或初始化问题。您只需调用:
\n\nfrom lib.configurator import configure\n\n\ndef my_task(ds, **kwargs):\n pool = configure()\n # pool.get_slack_connection() etc...\nRun Code Online (Sandbox Code Playgroud)\n\n希望这可以帮助。
\n