在 Airflow 2 [Cloud Composer] 中导入自定义插件

juw*_*a92 5 python airflow google-cloud-composer

我有这样的目录结构:

\n
airflow_dags\n\xe2\x94\x9c\xe2\x94\x80\xe2\x94\x80 dags\n\xe2\x94\x82   \xe2\x94\x94\xe2\x94\x80\xe2\x94\x80 hk  \n\xe2\x94\x82       \xe2\x94\x94\xe2\x94\x80\xe2\x94\x80 hk_dag.py  \n\xe2\x94\x9c\xe2\x94\x80\xe2\x94\x80 plugins\n\xe2\x94\x82   \xe2\x94\x94\xe2\x94\x80\xe2\x94\x80 cse   \n\xe2\x94\x82       \xe2\x94\x94\xe2\x94\x80\xe2\x94\x80 operators.py   \n\xe2\x94\x82           \xe2\x94\x94\xe2\x94\x80\xe2\x94\x80 cse_to_bq.py   \n\xe2\x94\x94\xe2\x94\x80\xe2\x94\x80 test\n   \xe2\x94\x94\xe2\x94\x80\xe2\x94\x80 dags   \n       \xe2\x94\x94\xe2\x94\x80\xe2\x94\x80 dag_test.py  \n
Run Code Online (Sandbox Code Playgroud)\n

在 Cloud Composer 创建的 GCS 存储桶中,有一个插件文件夹,我将cse文件夹上传到其中。

\n

现在在我的hk_dag.py文件中,如果我像这样导入插件:

\n
from plugins.cse.operators.cse_to_bq import CSEToBQOperator\n
Run Code Online (Sandbox Code Playgroud)\n

并运行我的单元测试,它通过了,但在云编辑器中我收到一条ModuleNotFoundError: No module named 'plugins'错误消息。

\n

如果我在我的中导入这样的插件hk_dag.py

\n
from cse.operators.cse_to_bq import CSEToBQOperator\n
Run Code Online (Sandbox Code Playgroud)\n

我的单元测试失败了,ModuleNotFoundError: No module named 'cse'但在 Cloud Composer 中运行良好。

\n

我该如何解决?

\n

小智 6

在 Airflow 2.0 中,要导入插件,您只需直接从操作员模块中执行即可。

\n

在你的情况下,必须是这样的:

\n
from operators.cse_to_bq import CSEToBQOperator\n
Run Code Online (Sandbox Code Playgroud)\n

但在此之前,您必须将文件夹结构更改为:

\n
airflow_dags\n\xe2\x94\x9c\xe2\x94\x80\xe2\x94\x80 dags\n\xe2\x94\x82   \xe2\x94\x94\xe2\x94\x80\xe2\x94\x80 hk  \n\xe2\x94\x82       \xe2\x94\x94\xe2\x94\x80\xe2\x94\x80 hk_dag.py  \n\xe2\x94\x9c\xe2\x94\x80\xe2\x94\x80 plugins\n\xe2\x94\x82   \xe2\x94\x94\xe2\x94\x80\xe2\x94\x80 operators   \n\xe2\x94\x82       \xe2\x94\x94\xe2\x94\x80\xe2\x94\x80 cse   \n\xe2\x94\x82           \xe2\x94\x94\xe2\x94\x80\xe2\x94\x80 cse_to_bq.py \n\xe2\x94\x94\xe2\x94\x80\xe2\x94\x80 test\n   \xe2\x94\x94\xe2\x94\x80\xe2\x94\x80 dags   \n       \xe2\x94\x94\xe2\x94\x80\xe2\x94\x80 dag_test.py \n
Run Code Online (Sandbox Code Playgroud)\n