池工作者的Python多进程 - 内存使用优化

Ste*_*fan 5 python performance memory-management multiprocessing string-matching

我有一个模糊的字符串匹配脚本,在400万公司名称的大海捞针中寻找大约30K针.虽然脚本工作正常,但我在AWS h1.xlarge上通过并行处理加速处理的尝试失败了,因为我的内存不足.

我不想试图获得更多内存,而是根据我之前的问题解释,我想找出如何优化工作流程 - 我对此很新,所以应该有足够的空间.顺便说一句,我已经尝试过排队(虽然工作但是遇到了同样的问题MemoryError,再看看一堆非常有用的SO贡献,但还没有完成.)

这是与代码最相关的内容.我希望它足以澄清逻辑 - 很高兴根据需要提供更多信息:

def getHayStack():
    ## loads a few million company names into id: name dict
    return hayCompanies

def getNeedles(*args):
    ## loads subset of 30K companies into id: name dict (for allocation to workers)
    return needleCompanies

def findNeedle(needle, haystack):
    """ Identify best match and return results with score """
    results = {}
    for hayID, hayCompany in haystack.iteritems():
        if not isnull(haystack[hayID]):
            results[hayID] = levi.setratio(needle.split(' '), 
                                           hayCompany.split(' '))
    scores = list(results.values())
    resultIDs = list(results.keys())
    needleID = resultIDs[scores.index(max(scores))]
    return [needleID, haystack[needleID], max(scores)]

def runMatch(args):
    """ Execute findNeedle and process results for poolWorker batch"""
    batch, first = args
    last = first + batch
    hayCompanies = getHayStack()
    needleCompanies = getTargets(first, last)
    needles = defaultdict(list)
    current = first
    for needleID, needleCompany in needleCompanies.iteritems():
        current += 1
        needles[targetID] = findNeedle(needleCompany, hayCompanies)
    ## Then store results

if __name__ == '__main__':
    pool = Pool(processes = numProcesses)
    totalTargets = len(getTargets('all'))
    targetsPerBatch = totalTargets / numProcesses
    pool.map_async(runMatch, 
                   itertools.izip(itertools.repeat(targetsPerBatch),
                                  xrange(0, 
                                         totalTargets,
                                         targetsPerBatch))).get(99999999)
    pool.close()
    pool.join()
Run Code Online (Sandbox Code Playgroud)

所以我想问题是:我怎样才能避免为所有工人加载大海捞针 - 例如通过共享数据或采取不同的方法,例如将更大的干草堆分成工人而不是针头?如何通过避免或消除混乱来改善内存使用?

aba*_*ert 4

你的设计有点混乱。您正在使用一个由 N 个工作人员组成的池,然后将您的 M 个工作分解为大小为 M/N 的 N 个任务。换句话说,如果您一切都正确,那么您就是在构建于工作进程之上的池之上模拟工作进程。为什么要这么麻烦呢?如果要使用进程,直接使用即可。或者,将池用作池,将每个作业作为其自己的任务发送,并使用批处理功能以某种适当(且可调整)的方式对它们进行批处理。

\n\n

这意味着runMatch只需要一个needleID和needeCompany,它所做的就是调用findNeedle,然后执行该# Then store results部分的任何操作。然后主程序就变得简单多了:

\n\n
if __name__ == \'__main__\':\n    with Pool(processes=numProcesses) as pool:\n        results = pool.map_async(runMatch, needleCompanies.iteritems(), \n                                 chunkSize=NUMBER_TWEAKED_IN_TESTING).get()\n
Run Code Online (Sandbox Code Playgroud)\n\n

或者,如果结果很小,则不必让所有进程(大概)争夺某些共享的结果存储内容,只需返回它们即可。runMatch那么你根本不需要,只需:

\n\n
if __name__ == \'__main__\':\n    with Pool(processes=numProcesses) as pool:\n        for result in pool.imap_unordered(findNeedle, needleCompanies.iteritems(), \n                                          chunkSize=NUMBER_TWEAKED_IN_TESTING):\n            # Store result\n
Run Code Online (Sandbox Code Playgroud)\n\n

或者,如果您确实想要执行 N 批,只需为每批创建一个进程:

\n\n
if __name__ == \'__main__\':\n    totalTargets = len(getTargets(\'all\'))\n    targetsPerBatch = totalTargets / numProcesses\n    processes = [Process(target=runMatch, \n                         args=(targetsPerBatch,\n                               xrange(0, \n                                      totalTargets,\n                                      targetsPerBatch))) \n                 for _ in range(numProcesses)]\n    for p in processes:\n        p.start()\n    for p in processes:\n        p.join()\n
Run Code Online (Sandbox Code Playgroud)\n\n
\n\n

另外,您似乎getHayStack()为每个任务调用一次(getNeedles也)。我不确定同时获得此活动的多个副本有多容易,但考虑到它是迄今为止最大的数据结构,这将是我尝试的第一件事排除。事实上,即使这不是内存使用问题,getHayStack也很容易对性能造成很大影响,除非您已经进行了某种缓存(例如,显式地将其存储在全局或可变的默认参数值中)第一次,然后就使用它),所以无论如何它可能值得修复。

\n\n

同时解决这两个潜在问题的一种方法是在Pool构造函数中使用初始化程序:

\n\n
def initPool():\n    global _haystack\n    _haystack = getHayStack()\n\ndef runMatch(args):\n    global _haystack\n    # ...\n    hayCompanies = _haystack\n    # ...\n\nif __name__ == \'__main__\':\n    pool = Pool(processes=numProcesses, initializer=initPool)\n    # ...\n
Run Code Online (Sandbox Code Playgroud)\n\n
\n\n

接下来,我注意到您在实际上并不需要的多个位置显式生成了列表。例如:

\n\n
scores = list(results.values())\nresultIDs = list(results.keys())\nneedleID = resultIDs[scores.index(max(scores))]\nreturn [needleID, haystack[needleID], max(scores)]\n
Run Code Online (Sandbox Code Playgroud)\n\n

如果结果较多,则这是浪费;直接使用results.values()iterable就可以了。(事实上​​,看起来您正在使用 Python 2.x,在这种情况下keys已经values是列表,因此您只是无缘无故地制作了一个额外的副本。)

\n\n

但在这种情况下,您可以进一步简化整个事情。您只是在寻找分数最高的键(resultID)和值(分数),对吧?所以:

\n\n
needleID, score = max(results.items(), key=operator.itemgetter(1))\nreturn [needleID, haystack[needleID], score]\n
Run Code Online (Sandbox Code Playgroud)\n\n

这也消除了对 的所有重复搜索score,这应该节省一些 CPU。

\n\n
\n\n

这可能无法直接解决内存问题,但它有望使调试和/或调整变得更容易。

\n\n

首先要尝试的是使用更小的批次 \xe2\x80\x94 而不是 input_size/cpu_count,尝试 1. 内存使用量是否会下降?如果没有,我们就排除了该部分。

\n\n

接下来,尝试sys.getsizeof(_haystack)看看它说了什么。例如,如果它是 1.6GB,那么您就可以将所有其他内容压缩到 0.4GB 中,这就是攻击它的方法\xe2\x80\x94,例如,使用数据库shelve而不是普通的dict.

\n\n

还可以尝试在初始化函数的开始和结束处转储内存使用情况(使用resource模块, )。getrusage(RUSAGE_SELF)如果最终的大海捞针只有 0.3GB,但你又分配了 1.3GB 来构建它,那就是要解决的问题。例如,您可以分拆一个子进程来构建和腌制字典,然后让池初始化程序打开它并取消腌制它。或者结合两个\xe2\x80\x94shelve在第一个子级中构建一个数据库,并在初始化程序中以只读方式打开它。无论哪种方式,这也意味着您只执行一次 CSV 解析/字典构建工作,而不是 8 次。

\n\n

另一方面,如果您的虚拟机总使用量仍然很低(请注意,getrusage没有任何方法可以直接查看您的虚拟机总大小\xe2\x80\x94ru_maxrss通常是一个有用的近似值,特别是如果ru_nswap为 0)第一个任务运行,问题出在任务本身。

\n\n

第一的,getsizeof任务函数的参数和返回的值。如果它们很大,特别是如果它们在每个任务中不断变大或者变化很大,则可能只是腌制和取消腌制数据占用太多内存,最终其中 8 个加起来足以达到极限。

\n\n

否则,问题很可能出在任务函数本身。要么你有内存泄漏(你只能通过使用缺陷的C扩展模块或ctypes,但如果你在调用之间保留任何引用,例如,在全局中,你可能会永远保留东西不必要的),或者某些任务本身占用了太多内存。无论哪种方式,这应该是您可以通过拉出多处理并直接运行任务来更轻松地测试的东西,这更容易调试。

\n