Airflow 2 - ModuleNotFoundError:没有名为“airflow.operators.text_processing_plugin”的模块

Vin*_*eet 5 python python-3.x airflow

我是新手airflow,正在尝试制作一个用于处理文本的 dag。我有一个由文本处理任务组成的数据管道 - 读取文档、清理文本并将数据加载到 JSON 文件。对于文本处理,每个转换任务都使用自定义运算符,并将它们保存在text_processing_plugin文件夹中。文件夹的完整文件夹结构plugin是:-

\n
\xe2\x94\x9c\xe2\x94\x80\xe2\x94\x80 airflow.cfg\n\xe2\x94\x9c\xe2\x94\x80\xe2\x94\x80 airflow.db\n\xe2\x94\x9c\xe2\x94\x80\xe2\x94\x80 airflow-webserver.pid\n\xe2\x94\x9c\xe2\x94\x80\xe2\x94\x80 dags\n\xe2\x94\x82\xc2\xa0\xc2\xa0 \xe2\x94\x9c\xe2\x94\x80\xe2\x94\x80 d0.py\n\xe2\x94\x9c\xe2\x94\x80\xe2\x94\x80 plugins\n\xe2\x94\x82\xc2\xa0\xc2\xa0 \xe2\x94\x94\xe2\x94\x80\xe2\x94\x80 text_processing_plugin\n\xe2\x94\x82\xc2\xa0\xc2\xa0     \xe2\x94\x9c\xe2\x94\x80\xe2\x94\x80 __init__.py\n\xe2\x94\x82\xc2\xa0\xc2\xa0     \xe2\x94\x9c\xe2\x94\x80\xe2\x94\x80 operators\n\xe2\x94\x82\xc2\xa0\xc2\xa0     \xe2\x94\x82\xc2\xa0\xc2\xa0 \xe2\x94\x9c\xe2\x94\x80\xe2\x94\x80 dependency_parsing.py\n\xe2\x94\x82\xc2\xa0\xc2\xa0     \xe2\x94\x82\xc2\xa0\xc2\xa0 \xe2\x94\x9c\xe2\x94\x80\xe2\x94\x80 entity_detection.py\n\xe2\x94\x82\xc2\xa0\xc2\xa0     \xe2\x94\x82\xc2\xa0\xc2\xa0 \xe2\x94\x9c\xe2\x94\x80\xe2\x94\x80 __init__.py\n\xe2\x94\x82\xc2\xa0\xc2\xa0     \xe2\x94\x82\xc2\xa0\xc2\xa0 \xe2\x94\x9c\xe2\x94\x80\xe2\x94\x80 lemmatize.py\n\xe2\x94\x82\xc2\xa0\xc2\xa0     \xe2\x94\x82\xc2\xa0\xc2\xa0 \xe2\x94\x9c\xe2\x94\x80\xe2\x94\x80 pos_tagging.py\n\xe2\x94\x82\xc2\xa0\xc2\xa0     \xe2\x94\x82\xc2\xa0\xc2\xa0 \xe2\x94\x9c\xe2\x94\x80\xe2\x94\x80 remove_stop_words.py\n\xe2\x94\x82\xc2\xa0\xc2\xa0     \xe2\x94\x82\xc2\xa0\xc2\xa0 \xe2\x94\x94\xe2\x94\x80\xe2\x94\x80 tokenize_doc.pyfolder structure of plugin folder is:-\n\xe2\x94\x9c\xe2\x94\x80\xe2\x94\x80 requirements.txt\n\xe2\x94\x9c\xe2\x94\x80\xe2\x94\x80 unittests.cfg\n\n
Run Code Online (Sandbox Code Playgroud)\n

其中text_processing_plugin/__init__.py有以下代码:-

\n
from airflow.plugins_manager import AirflowPlugin\nfrom text_processing_plugin.operators.dependency_parsing import DependencyParsingOperator \nfrom text_processing_plugin.operators.entity_detection import DetectEntityOperator\nfrom text_processing_plugin.operators.lemmatize import LemmatizeOperator\nfrom text_processing_plugin.operators.pos_tagging import POSTagOperator\nfrom text_processing_plugin.operators.remove_stop_words import RemoveStopWordsOperator\nfrom text_processing_plugin.operators.tokenize_doc import DocTokenizerOperator\n\nclass TextProcessingPlugin(AirflowPlugin):\n    name = "text_processing_plugin"\n    operators = [DependencyParsingOperator, DetectEntityOperator, LemmatizeOperator, POSTagOperator, \n        RemoveStopWordsOperator, DocTokenizerOperator]\n    sensors = []\n    hooks = []\n    executors = []\n    macros = []\n    admin_views = []\n    flask_blueprints = []\n    menu_links = []\n    appbuilder_views = []\n    appbuilder_menu_items = []\n    global_operator_extra_links = []\n    operator_extra_links = []\n
Run Code Online (Sandbox Code Playgroud)\n

为了制作 DAG,airflow 1.x使用如下范例:-

\n
import os\nimport json\nimport spacy\nfrom airflow import DAG\nfrom airflow.operators.python import PythonOperator\nfrom datetime import datetime, timedelta\n\nfrom airflow.operators.text_processing_plugin import DependencyParsingOperator, DetectEntityOperator, LemmatizeOperator, POSTagOperator, RemoveStopWordsOperator, DocTokenizerOperator\n\nsp = spacy.load('en_core_web_sm')\n\ndefault_args = {\n    'owner': 'episource',\n    'depends_on_past': True,\n    'start_date': datetime.datetime(2021, 3, 30),\n    'retries': 0,\n    'schedule_interval':'@once',\n}\n\ndag = DAG(\n    'text_processing_dag',\n    description='Text Processing Dag',\n    default_args=default_args,\n    catchup=False,\n    tags=['text_processing'])\n\ndef read_doc(**kwargs):\n    file_path = os.path.join(os.getcwd(), '/data/1.txt')\n    doc = open(file_path).read()\n    return doc\n\ndef write_to_json(**kwargs):\n    ti = kwargs['ti']\n    with open(os.path.join(os.getcwd, 'output', '1.json'), 'a+') as file:\n        result_1 = ti.xcom_pull(task_ids = 'tokenize_doc')\n        result_2 = ti.xcom_pull(task_ids = 'detect_entity')\n        print('result 1 is ', result_1)\n        print('result 2 is ', result_2)\n        file.write(json.dumps(result_1))\n        file.write(json.dumps(result_2))\n\n\nextract = PythonOperator(\n    task_id = 'extract',\n    python_callable = read_doc,\n    dag = dag)\n\nt11_tokenize_doc = DocTokenizerOperator(\n    sp = sp,\n    task_id = "transform_tokenize_doc", \n    dag = dag, \n    name = "Sentence Tokenizing",\n    pool='t1',\n    task_concurrency=2)\n\nt12_detect_entities = DetectEntityOperator(\n    sp = sp,\n    task_id = "transform_detect_entity", \n    dag = dag, \n    name = "Entity Detection",\n    pool='t1',\n    task_concurrency=2)\n\nload = PythonOperator(\n    task_id = 'load',\n    python_callable = write_to_json,\n    dag = dag)\n\nextract >> [t11_tokenize_doc, t12_detect_entities] >> load\n
Run Code Online (Sandbox Code Playgroud)\n

当我尝试运行代码时,我得到:-

\n
Traceback (most recent call last):\n  File "dags/d0.py", line 8, in <module>\n    from airflow.operators.text_processing_plugin import DependencyParsingOperator, DetectEntityOperator, LemmatizeOperator, POSTagOperator, RemoveStopWordsOperator, DocTokenizerOperator\nModuleNotFoundError: No module named 'airflow.operators.text_processing_plugin'\n
Run Code Online (Sandbox Code Playgroud)\n

我参考了 Stackoverflow 上的一些现有答案,但无法解决该错误。希望得到一些提示。

\n

Dan*_*har 7

版本2.0中的更改:不再支持通过气流导入插件中添加的运算符、传感器、钩子。{operators,sensors,hooks}.<plugin_name> 不再支持,这些扩展应该作为常规 python 模块导入。有关更多信息,请参阅:模块管理创建自定义运算符