Python:空间和内存消耗

Vic*_*DDT 9 python-3.x spacy

1 - 问题

我在 python 上使用“spacy”进行文本文档词形还原。有 500,000 个文档的大小高达 20 Mb 的干净文本。

问题如下:spacy 内存消耗随着时间的推移而增长,直到使用整个内存。

2 - 背景

我的硬件配置: CPU:Intel I7-8700K 3.7 GHz(12 核)内存:16 Gb SSD:板载 1 Tb GPU,但不用于此任务

我正在使用“多处理”在多个进程(工人)之间拆分任务。每个工作人员都会收到一份要处理的文件列表。主进程执行子进程的监视。我在每个子进程中启动一次“spacy”,并使用这个 spacy 实例来处理 worker 中的整个文档列表。

内存跟踪说明如下:

[内存跟踪 - 前 10 名]

/opt/develop/virtualenv/lib/python3.6/site-packages/thinc/neural/mem.py:68: size=45.1 MiB, count=99, average=467 KiB

/opt/develop/virtualenv/lib/python3.6/posixpath.py:149: size=40.3 MiB, count=694225, average=61 B

:487:大小=9550 KiB,计数=77746,平均值=126 B

/opt/develop/virtualenv/lib/python3.6/site-packages/dawg_python/wrapper.py:33: size=7901 KiB, count=6, average=1317 KiB

/opt/develop/virtualenv/lib/python3.6/site-packages/spacy/lang/en/lemmatizer/_nouns.py:7114: size=5273 KiB, count=57494, average=94 B

prepare_docs04.py:372:大小=4189 KiB,计数=1,平均值=4189 KiB

/opt/develop/virtualenv/lib/python3.6/site-packages/dawg_python/wrapper.py:93: size=3949 KiB, count=5, average=790 KiB

/usr/lib/python3.6/json/decoder.py:355: size=1837 KiB, count=20456, average=92 B

/opt/develop/virtualenv/lib/python3.6/site-packages/spacy/lang/en/lemmatizer/_adjectives.py:2828: size=1704 KiB, count=20976, average=83 B

prepare_docs04.py:373:大小=1633 KiB,计数=1,平均值=1633 KiB

3 - 期望

我看到了一个很好的建议来构建一个分离的服务器 - 客户端解决方案 [here]是否可以在内存中保持空间以减少加载时间?

是否可以使用“多处理”方法来控制内存消耗?

4 - 守则

这是我的代码的简化版本:

import os, subprocess, spacy, sys, tracemalloc
from multiprocessing import Pipe, Process, Lock
from time import sleep

# START: memory trace
tracemalloc.start()

# Load spacy
spacyMorph = spacy.load("en_core_web_sm")

#
# Get word's lemma
#
def getLemma(word):
    global spacyMorph
    lemmaOutput = spacyMorph(str(word))
    return lemmaOutput


#
# Worker's logic
#
def workerNormalize(lock, conn, params):
    documentCount = 1
    for filenameRaw in params[1]:
        documentTotal = len(params[1])
        documentID = int(os.path.basename(filenameRaw).split('.')[0])

        # Send to the main process the worker's current progress
        if not lock is None:
            lock.acquire()
            try:
                statusMessage = "WORKING:{:d},{:d},".format(documentID, documentCount)
                conn.send(statusMessage)
                documentCount += 1
            finally:
                lock.release()
        else:
            print(statusMessage)

        # ----------------
        # Some code is excluded for clarity sake
        # I've got a "wordList" from file "filenameRaw"
        # ----------------

        wordCount = 1
        wordTotalCount = len(wordList)

        for word in wordList:
            lemma = getLemma(word)
            wordCount += 1

        # ----------------
        # Then I collect all lemmas and save it to another text file
        # ----------------

        # Here I'm trying to reduce memory usage
        del wordList
        del word
        gc.collect()


if __name__ == '__main__':
    lock = Lock()
    processList = []

    # ----------------
    # Some code is excluded for clarity sake
    # Here I'm getting full list of files "fileTotalList" which I need to lemmatize
    # ----------------
    while cursorEnd < (docTotalCount + stepSize):
        fileList = fileTotalList[cursorStart:cursorEnd]

        # ----------------
        # Create workers and populate it with list of files to process
        # ----------------
        processData = {}
        processData['total'] = len(fileList)  # worker total progress
        processData['count'] = 0  # worker documents done count
        processData['currentDocID'] = 0  # current document ID the worker is working on
        processData['comment'] = ''  # additional comment (optional)
        processData['con_parent'], processData['con_child'] = Pipe(duplex=False)
        processName = 'worker ' + str(count) + " at " + str(cursorStart)
        processData['handler'] = Process(target=workerNormalize, name=processName, args=(lock, processData['con_child'], [processName, fileList]))

        processList.append(processData)
        processData['handler'].start()

        cursorStart = cursorEnd
        cursorEnd += stepSize
        count += 1

    # ----------------
    # Run the monitor to look after the workers
    # ----------------
    while True:
        runningCount = 0

        #Worker communication format:
        #STATUS:COMMENTS

        #STATUS:
        #- WORKING - worker is working
        #- CLOSED - worker has finished his job and closed pipe-connection

        #COMMENTS:
        #- for WORKING status:
        #DOCID,COUNT,COMMENTS
        #DOCID - current document ID the worker is working on
        #COUNT - count of done documents
        #COMMENTS - additional comments (optional)


        # ----------------
        # Run through the list of workers ...
        # ----------------
        for i, process in enumerate(processList):
            if process['handler'].is_alive():
                runningCount += 1

                # ----------------
                # .. and check if there is somethng in the PIPE
                # ----------------
                if process['con_parent'].poll():
                    try:
                        message = process['con_parent'].recv()
                        status = message.split(':')[0]
                        comment = message.split(':')[1]

                        # ----------------
                        # Some code is excluded for clarity sake
                        # Update worker's information and progress in "processList"
                        # ----------------

                    except EOFError:
                        print("EOF----")

                # ----------------
                # Some code is excluded for clarity sake
                # Here I draw some progress lines per workers
                # ----------------

            else:
                # worker has finished his job. Close the connection.
                process['con_parent'].close()

        # Whait for some time and monitor again
        sleep(PARAM['MONITOR_REFRESH_FREQUENCY'])


    print("================")
    print("**** DONE ! ****")
    print("================")

    # ----------------
    # Here I'm measuring memory usage to find the most "gluttonous" part of the code
    # ----------------
    snapshot = tracemalloc.take_snapshot()
    top_stats = snapshot.statistics('lineno')

    print("[ Memory trace - Top 10 ]")
    for stat in top_stats[:10]:
        print(stat)


'''

Run Code Online (Sandbox Code Playgroud)

mke*_*rig 11

对于将来接触此问题的人,我发现了一个似乎效果很好的技巧:

import spacy
import en_core_web_lg
import multiprocessing

docs = ['Your documents']

def process_docs(docs, n_processes=None):
    # Load the model inside the subprocess, 
    # as that seems to be the main culprit of the memory issues
    nlp = en_core_web_lg.load()

    if not n_processes:
        n_processes = multiprocessing.cpu_count()

    processed_docs = [doc for doc in nlp.pipe(docs, disable=['ner', 'parser'], n_process=n_processes)]


    # Then do what you wish beyond this point. I end up writing results out to s3.
    pass

for x in range(10):
    # This will spin up a subprocess, 
    # and everytime it finishes it will release all resources back to the machine.
    with multiprocessing.Manager() as manager:
        p = multiprocessing.Process(target=process_docs, args=(docs))
        p.start()
        p.join()
Run Code Online (Sandbox Code Playgroud)

这里的想法是将所有与 Spacy 相关的内容放入子进程中,以便在子进程完成后释放所有内存。我知道它正在工作,因为每次子进程完成时我实际上都可以看到内存被释放回实例(而且实例也不再崩溃 xD)。

全面披露:我不知道为什么 Spacy 似乎会随着时间的推移而增加内存,我已经阅读了所有内容,试图找到一个简单的答案,并且我看到的所有 github 问题都声称他们已经解决了该问题,但我仍然看到当我在 AWS Sagemaker 实例上使用 Spacy 时会发生这种情况。

希望这对某人有帮助!我知道我花了好几个小时为此焦头烂额。

感谢另一个SO 答案,它详细解释了 Python 中的子流程。


aab*_*aab 7

内存泄漏与 spacy

处理大量数据时的内存问题似乎是一个已知问题,请参阅一些相关的github问题:

不幸的是,看起来还没有一个好的解决方案。

词形还原

查看您的特定词形还原任务,我认为您的示例代码有点过于简化了,因为您在单个单词上运行完整的 spacy 管道,然后对结果没有做任何事情(甚至没有检查引理?),所以很难说你真正想要做什么。

我假设您只想进行词形还原,因此一般而言,您希望尽可能多地禁用您不使用的管道部分(尤其是仅进行词形还原时的解析,请参阅https://spacy。 io/usage/processing-pipelines#disabling ) 并用于nlp.pipe批量处理文档。如果您使用解析器或实体识别,Spacy 无法处理非常长的文档,因此您需要以某种方式分解文本(或者只是为了词形还原/标记,您可以nlp.max_length根据需要增加)。

像在您的示例中那样将文档分解为单个单词会违背大多数 spacy 分析的目的(您通常无法有意义地标记或解析单个单词),而且以这种方式调用 spacy 会非常慢。

查找词形还原

如果您只需要上下文之外的常用词的引理(其中标注器不会提供任何有用的信息),您可以查看查找词形还原器是否足以满足您的任务并跳过其余的处理:

from spacy.lemmatizer import Lemmatizer
from spacy.lang.en import LOOKUP
lemmatizer = Lemmatizer(lookup=LOOKUP)
print(lemmatizer(u"ducks", ''), lemmatizer(u"ducking", ''))
Run Code Online (Sandbox Code Playgroud)

输出:

['鸭子'] ['鸭子']

它只是一个静态查找表,因此它不会很好地处理未知单词或诸如“wugs”或“DUCKS”之类的单词的大写,因此您必须查看它是否适合您的文本,但它会在没有内存泄漏的情况下要快得多。(您也可以自己使用该表而无需空间,它在这里:https : //github.com/michmech/lemmatization-lists。)

更好的词形还原

否则,使用更像这样的东西来批量处理文本:

nlp = spacy.load('en', disable=['parser', 'ner'])
# if needed: nlp.max_length = MAX_DOC_LEN_IN_CHAR
for doc in nlp.pipe(texts):
  for token in doc:
    print(token.lemma_)
Run Code Online (Sandbox Code Playgroud)

如果您处理一个长文本(或nlp.pipe()用于许多较短的文本)而不是处理单个单词,则您应该能够在一个线程中每秒标记/词形化(许多)数千个单词。