Vin*_*eet 5 python python-3.x airflow
我是新手airflow,正在尝试制作一个用于处理文本的 dag。我有一个由文本处理任务组成的数据管道 - 读取文档、清理文本并将数据加载到 JSON 文件。对于文本处理,每个转换任务都使用自定义运算符,并将它们保存在text_processing_plugin文件夹中。文件夹的完整文件夹结构plugin是:-
\xe2\x94\x9c\xe2\x94\x80\xe2\x94\x80 airflow.cfg\n\xe2\x94\x9c\xe2\x94\x80\xe2\x94\x80 airflow.db\n\xe2\x94\x9c\xe2\x94\x80\xe2\x94\x80 airflow-webserver.pid\n\xe2\x94\x9c\xe2\x94\x80\xe2\x94\x80 dags\n\xe2\x94\x82\xc2\xa0\xc2\xa0 \xe2\x94\x9c\xe2\x94\x80\xe2\x94\x80 d0.py\n\xe2\x94\x9c\xe2\x94\x80\xe2\x94\x80 plugins\n\xe2\x94\x82\xc2\xa0\xc2\xa0 \xe2\x94\x94\xe2\x94\x80\xe2\x94\x80 text_processing_plugin\n\xe2\x94\x82\xc2\xa0\xc2\xa0 \xe2\x94\x9c\xe2\x94\x80\xe2\x94\x80 __init__.py\n\xe2\x94\x82\xc2\xa0\xc2\xa0 \xe2\x94\x9c\xe2\x94\x80\xe2\x94\x80 operators\n\xe2\x94\x82\xc2\xa0\xc2\xa0 \xe2\x94\x82\xc2\xa0\xc2\xa0 \xe2\x94\x9c\xe2\x94\x80\xe2\x94\x80 dependency_parsing.py\n\xe2\x94\x82\xc2\xa0\xc2\xa0 \xe2\x94\x82\xc2\xa0\xc2\xa0 \xe2\x94\x9c\xe2\x94\x80\xe2\x94\x80 entity_detection.py\n\xe2\x94\x82\xc2\xa0\xc2\xa0 \xe2\x94\x82\xc2\xa0\xc2\xa0 \xe2\x94\x9c\xe2\x94\x80\xe2\x94\x80 __init__.py\n\xe2\x94\x82\xc2\xa0\xc2\xa0 \xe2\x94\x82\xc2\xa0\xc2\xa0 \xe2\x94\x9c\xe2\x94\x80\xe2\x94\x80 lemmatize.py\n\xe2\x94\x82\xc2\xa0\xc2\xa0 \xe2\x94\x82\xc2\xa0\xc2\xa0 \xe2\x94\x9c\xe2\x94\x80\xe2\x94\x80 pos_tagging.py\n\xe2\x94\x82\xc2\xa0\xc2\xa0 \xe2\x94\x82\xc2\xa0\xc2\xa0 \xe2\x94\x9c\xe2\x94\x80\xe2\x94\x80 remove_stop_words.py\n\xe2\x94\x82\xc2\xa0\xc2\xa0 \xe2\x94\x82\xc2\xa0\xc2\xa0 \xe2\x94\x94\xe2\x94\x80\xe2\x94\x80 tokenize_doc.pyfolder structure of plugin folder is:-\n\xe2\x94\x9c\xe2\x94\x80\xe2\x94\x80 requirements.txt\n\xe2\x94\x9c\xe2\x94\x80\xe2\x94\x80 unittests.cfg\n\nRun Code Online (Sandbox Code Playgroud)\n其中text_processing_plugin/__init__.py有以下代码:-
from airflow.plugins_manager import AirflowPlugin\nfrom text_processing_plugin.operators.dependency_parsing import DependencyParsingOperator \nfrom text_processing_plugin.operators.entity_detection import DetectEntityOperator\nfrom text_processing_plugin.operators.lemmatize import LemmatizeOperator\nfrom text_processing_plugin.operators.pos_tagging import POSTagOperator\nfrom text_processing_plugin.operators.remove_stop_words import RemoveStopWordsOperator\nfrom text_processing_plugin.operators.tokenize_doc import DocTokenizerOperator\n\nclass TextProcessingPlugin(AirflowPlugin):\n name = "text_processing_plugin"\n operators = [DependencyParsingOperator, DetectEntityOperator, LemmatizeOperator, POSTagOperator, \n RemoveStopWordsOperator, DocTokenizerOperator]\n sensors = []\n hooks = []\n executors = []\n macros = []\n admin_views = []\n flask_blueprints = []\n menu_links = []\n appbuilder_views = []\n appbuilder_menu_items = []\n global_operator_extra_links = []\n operator_extra_links = []\nRun Code Online (Sandbox Code Playgroud)\n为了制作 DAG,airflow 1.x使用如下范例:-
import os\nimport json\nimport spacy\nfrom airflow import DAG\nfrom airflow.operators.python import PythonOperator\nfrom datetime import datetime, timedelta\n\nfrom airflow.operators.text_processing_plugin import DependencyParsingOperator, DetectEntityOperator, LemmatizeOperator, POSTagOperator, RemoveStopWordsOperator, DocTokenizerOperator\n\nsp = spacy.load('en_core_web_sm')\n\ndefault_args = {\n 'owner': 'episource',\n 'depends_on_past': True,\n 'start_date': datetime.datetime(2021, 3, 30),\n 'retries': 0,\n 'schedule_interval':'@once',\n}\n\ndag = DAG(\n 'text_processing_dag',\n description='Text Processing Dag',\n default_args=default_args,\n catchup=False,\n tags=['text_processing'])\n\ndef read_doc(**kwargs):\n file_path = os.path.join(os.getcwd(), '/data/1.txt')\n doc = open(file_path).read()\n return doc\n\ndef write_to_json(**kwargs):\n ti = kwargs['ti']\n with open(os.path.join(os.getcwd, 'output', '1.json'), 'a+') as file:\n result_1 = ti.xcom_pull(task_ids = 'tokenize_doc')\n result_2 = ti.xcom_pull(task_ids = 'detect_entity')\n print('result 1 is ', result_1)\n print('result 2 is ', result_2)\n file.write(json.dumps(result_1))\n file.write(json.dumps(result_2))\n\n\nextract = PythonOperator(\n task_id = 'extract',\n python_callable = read_doc,\n dag = dag)\n\nt11_tokenize_doc = DocTokenizerOperator(\n sp = sp,\n task_id = "transform_tokenize_doc", \n dag = dag, \n name = "Sentence Tokenizing",\n pool='t1',\n task_concurrency=2)\n\nt12_detect_entities = DetectEntityOperator(\n sp = sp,\n task_id = "transform_detect_entity", \n dag = dag, \n name = "Entity Detection",\n pool='t1',\n task_concurrency=2)\n\nload = PythonOperator(\n task_id = 'load',\n python_callable = write_to_json,\n dag = dag)\n\nextract >> [t11_tokenize_doc, t12_detect_entities] >> load\nRun Code Online (Sandbox Code Playgroud)\n当我尝试运行代码时,我得到:-
\nTraceback (most recent call last):\n File "dags/d0.py", line 8, in <module>\n from airflow.operators.text_processing_plugin import DependencyParsingOperator, DetectEntityOperator, LemmatizeOperator, POSTagOperator, RemoveStopWordsOperator, DocTokenizerOperator\nModuleNotFoundError: No module named 'airflow.operators.text_processing_plugin'\nRun Code Online (Sandbox Code Playgroud)\n我参考了 Stackoverflow 上的一些现有答案,但无法解决该错误。希望得到一些提示。
\n| 归档时间: |
|
| 查看次数: |
14740 次 |
| 最近记录: |