Python中的多处理池 - 仅使用单个CPU

Mag*_*gie 7 python pool multiprocessing

原始问题

我试图在Python中使用多处理池.这是我的代码:

def f(x):
    return x

def foo():
    p = multiprocessing.Pool()
    mapper = p.imap_unordered

    for x in xrange(1, 11):
        res = list(mapper(f,bar(x)))
Run Code Online (Sandbox Code Playgroud)

当代码xrange很小时,这段代码使用所有CPU(我有8个CPU)xrange(1, 6).但是,当我将范围增加到xrange(1, 10).我观察到只有1个CPU以100%运行,而其余的只是空转.可能是什么原因?是因为,当我增加范围时,操作系统会因过热而关闭CPU吗?

我该如何解决这个问题?

最小,完整,可验证的例子

为了复制我的问题,我创建了这个例子:它是一个来自字符串问题的简单ngram生成.

#!/usr/bin/python

import time
import itertools
import threading
import multiprocessing
import random


def f(x):
    return x

def ngrams(input_tmp, n):
    input = input_tmp.split()

    if n > len(input):
        n = len(input)

    output = []
    for i in range(len(input)-n+1):
        output.append(input[i:i+n])
    return output 

def foo():

    p = multiprocessing.Pool()
    mapper = p.imap_unordered

    num = 100000000 #100
    rand_list = random.sample(xrange(100000000), num)

    rand_str = ' '.join(str(i) for i in rand_list)

    for n in xrange(1, 100):
        res = list(mapper(f, ngrams(rand_str, n)))


if __name__ == '__main__':
    start = time.time()
    foo()
    print 'Total time taken: '+str(time.time() - start)
Run Code Online (Sandbox Code Playgroud)

num小(例如num = 10000)时,我发现使用了所有8个CPU.然而,当num基本上很大时(例如num = 100000000).仅使用2个CPU,其余为空闲.这是我的问题.

注意:如果num太大,可能会导致系统/ VM崩溃.

aba*_*ert 7

首先,ngrams它本身需要很多时间.虽然这种情况正在发生,但显然只有一个核心.但即使完成(通过将ngrams呼叫移到外部并在其之前和之后mapper投入一个非常容易测试print),您仍然只使用一个核心.我得到100核100%,其他核心大约2%.

如果你在Python 3.4中尝试相同的东西,事情会有所不同 - 我仍然得到100%的1核心,但其他的是15-25%.

那么,发生了什么?好吧,在multiprocessing传递参数和返回值总是有一些开销.在你的情况下,这个开销完全淹没了实际的工作,这是公正的return x.

以下是开销的工作原理:主进程必须挑选值,然后将它们放在队列中,然后等待另一个队列上的值并取消它们.每个子进程在第一个队列上等待,取消排序值,执行任何操作,修补值,并将它们放在另一个队列中.必须同步对队列的访问(在大多数非Windows平台上通过POSIX信号量,我认为是Windows上的NT内核互斥体).

据我所知,你的流程花费了99%以上的时间等待队列或阅读或写作.

考虑到您需要处理大量数据,并且除了对数据进行pickle和unpickling之外没有计算,这并不是意外.

如果你看一下CPython 2.7中的源代码,那么SimpleQueue在持有锁的情况下会发生酸洗和去除.因此,几乎所有的工作都会在锁定时发生,这意味着它们最终都会在单个核心上进行序列化.

但是在CPython 3.4中,酸洗和去除发生在锁.显然,这足以使用15-25%的核心.(我相信这种变化发生在3.2,但我懒得跟踪它.)

尽管如此,即使在3.4上,你也要花费更多的时间来等待访问队列而不是做任何事情,甚至是multiprocessing开销.这就是为什么核心只能达到25%.

当然,你在开销上花费的时间比实际工作多一个数量级,这使得这不是一个很好的测试,除非你试图测试你可以从multiprocessing你的机器上的特定实现中获得的最大吞吐量或一些东西.

一些观察:

  • 在您的真实代码中,如果您可以找到一种方法来批量处理更大的任务(明确地 - 仅仅依赖于chunksize=1000此类或类似的东西将无济于事),这可能会解决您的大部分问题.
  • 如果您的巨型阵列(或其他)从未实际更改过,您可以在池初始化程序中传递它,而不是在每个任务中传递它,这几乎可以消除问题.
  • 如果它确实发生了变化,但只是来自主流程方面,则可能值得共享而不是传递数据.
  • 如果您需要从子进程中改变它,请查看是否有分区数据的方法,以便每个任务可以拥有一个没有争用的切片.
  • 即使您需要具有显式锁定的完全竞争的共享内存,它仍然可能比传递这么大的东西更好.
  • 可能值得从PyPI(或升级到Python 3.x)获得3.2+版本multiprocessing或第三方multiprocessing库之一的后端,只是为了将酸洗移出锁定.


And*_*ini 2

问题是你的f()函数(在单独的进程上运行的函数)没有做任何特殊的事情,因此它没有给 CPU 带来负载。

ngrams()另一方面,正在执行一些“繁重”计算,但您是在主进程上调用此函数,而不是在池中。

为了让事情更清楚,请考虑这段代码......

for n in xrange(1, 100):
    res = list(mapper(f, ngrams(rand_str, n)))
Run Code Online (Sandbox Code Playgroud)

...相当于这样:

for n in xrange(1, 100):
    arg = ngrams(rand_str, n)
    res = list(mapper(f, arg))
Run Code Online (Sandbox Code Playgroud)

另外,以下是在主进程上执行的 CPU 密集型操作:

num = 100000000
rand_list = random.sample(xrange(100000000), num)
Run Code Online (Sandbox Code Playgroud)

您应该更改代码,以便在池内调用sample()和,或者更改代码以执行 CPU 密集型操作,这样您就会看到所有 CPU 上的负载都很高。ngrams()f()