我为不同的 python 项目设置了不同的气流 dags,即一个父 dags 文件夹/vol/dags,其中包含基于不同 python 项目的 DAG 子文件夹:/vol/dags/project1/project1.py, /vol/dags/project2/project2.pywhere DAGS_FOLDER = /vol/dags.
project1.py例如从同一目录中的另一个 python 文件中导入一个函数,即/vol/dags/project1/mycalculator.py. 但是当我启动气流网络服务器时,我得到一个ImportError:
/vol/dags/project1/$ airflow webserver -p 8080
INFO - Filling up the DagBag from /vol/dags/
ERROR - Failed to import: /vol/dags/project1/project1.py
Traceback (most recent call last):
File "/Users/xxx/anaconda/lib/python2.7/site-packages/airflow/models.py", line 247, in process_file
m = imp.load_source(mod_name, filepath)
File "/vol/dags/project1/project1.py", line 10, in <module>
from mycalculator import *
ImportError: No module named mycalculator
Run Code Online (Sandbox Code Playgroud)
我试图导入mycalculator.py到project1.py这样的:
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators import PythonOperator
from datetime import datetime, timedelta
from mycalculator import *
dag = DAG(
dag_id='project1', default_args=args,
schedule_interval="@once")
Run Code Online (Sandbox Code Playgroud)
小智 6
您可以使用打包的 dag概念为不同的项目设置不同的 dag 文件夹。您只需要将每个项目的 zip 放在父 dag 文件夹中。
通过这种方式,您可以轻松地将 dag 与其依赖项组合在一起,并且您的 dag 文件夹将整洁干净,因为它只包含每个项目的 zip。
您可以创建一个如下所示的 zip:
my_dag1.py
my_dag2.py
package1/__init__.py
package1/functions.py
Run Code Online (Sandbox Code Playgroud)
您的父 dag 文件夹可能如下所示:
project1.zip
project2.zip
my_dag3.py
Run Code Online (Sandbox Code Playgroud)
这里同样的问题。
事实上,我们的导入有效,因为在 Airflow 上下文中,DAG_FOLDER 已添加到 PYTHONPATH 中。在project1/中添加init.py不会改变任何东西。
一个好的解决方案是使用相对导入,如
from .mycalculator import *
Run Code Online (Sandbox Code Playgroud)
但由于 Airflow 导入 Dags 的方式,相对导入现在无法工作(由气流开发人员向我解释)
因此,对我来说,最简单的解决方案是将 dags 文件保留在根目录中,并在它们前面加上“project1_”或“project2_”前缀,并将像 mycalculator 这样的库放在子文件夹中。
该文件夹/vol/dags/project1/缺少__init__.py文件。
该文件可以为空。
添加此文件,然后在 project2.py 中您应该能够执行以下操作:
import project1.mycalculator.*
Run Code Online (Sandbox Code Playgroud)
有关软件包的更多信息,请参阅此处: https ://docs.python.org/2/tutorial/modules.html#packages