Mag*_*gie 7 python pool multiprocessing
我试图在Python中使用多处理池.这是我的代码:
def f(x):
return x
def foo():
p = multiprocessing.Pool()
mapper = p.imap_unordered
for x in xrange(1, 11):
res = list(mapper(f,bar(x)))
Run Code Online (Sandbox Code Playgroud)
当代码xrange
很小时,这段代码使用所有CPU(我有8个CPU)xrange(1, 6)
.但是,当我将范围增加到xrange(1, 10)
.我观察到只有1个CPU以100%运行,而其余的只是空转.可能是什么原因?是因为,当我增加范围时,操作系统会因过热而关闭CPU吗?
我该如何解决这个问题?
为了复制我的问题,我创建了这个例子:它是一个来自字符串问题的简单ngram生成.
#!/usr/bin/python
import time
import itertools
import threading
import multiprocessing
import random
def f(x):
return x
def ngrams(input_tmp, n):
input = input_tmp.split()
if n > len(input):
n = len(input)
output = []
for i in range(len(input)-n+1):
output.append(input[i:i+n])
return output
def foo():
p = multiprocessing.Pool()
mapper = p.imap_unordered
num = 100000000 #100
rand_list = random.sample(xrange(100000000), num)
rand_str = ' '.join(str(i) for i in rand_list)
for n in xrange(1, 100):
res = list(mapper(f, ngrams(rand_str, n)))
if __name__ == '__main__':
start = time.time()
foo()
print 'Total time taken: '+str(time.time() - start)
Run Code Online (Sandbox Code Playgroud)
当num
小(例如num = 10000
)时,我发现使用了所有8个CPU.然而,当num
基本上很大时(例如num = 100000000
).仅使用2个CPU,其余为空闲.这是我的问题.
注意:如果num
太大,可能会导致系统/ VM崩溃.
首先,ngrams
它本身需要很多时间.虽然这种情况正在发生,但显然只有一个核心.但即使完成(通过将ngrams
呼叫移到外部并在其之前和之后mapper
投入一个非常容易测试print
),您仍然只使用一个核心.我得到100核100%,其他核心大约2%.
如果你在Python 3.4中尝试相同的东西,事情会有所不同 - 我仍然得到100%的1核心,但其他的是15-25%.
那么,发生了什么?好吧,在multiprocessing
传递参数和返回值总是有一些开销.在你的情况下,这个开销完全淹没了实际的工作,这是公正的return x
.
以下是开销的工作原理:主进程必须挑选值,然后将它们放在队列中,然后等待另一个队列上的值并取消它们.每个子进程在第一个队列上等待,取消排序值,执行任何操作,修补值,并将它们放在另一个队列中.必须同步对队列的访问(在大多数非Windows平台上通过POSIX信号量,我认为是Windows上的NT内核互斥体).
据我所知,你的流程花费了99%以上的时间等待队列或阅读或写作.
考虑到您需要处理大量数据,并且除了对数据进行pickle和unpickling之外没有计算,这并不是太意外.
如果你看一下CPython 2.7中的源代码,那么SimpleQueue
在持有锁的情况下会发生酸洗和去除.因此,几乎所有的工作都会在锁定时发生,这意味着它们最终都会在单个核心上进行序列化.
但是在CPython 3.4中,酸洗和去除发生在锁外.显然,这足以使用15-25%的核心.(我相信这种变化发生在3.2,但我懒得跟踪它.)
尽管如此,即使在3.4上,你也要花费更多的时间来等待访问队列而不是做任何事情,甚至是multiprocessing
开销.这就是为什么核心只能达到25%.
当然,你在开销上花费的时间比实际工作多一个数量级,这使得这不是一个很好的测试,除非你试图测试你可以从multiprocessing
你的机器上的特定实现中获得的最大吞吐量或一些东西.
一些观察:
chunksize=1000
此类或类似的东西将无济于事),这可能会解决您的大部分问题.multiprocessing
或第三方multiprocessing
库之一的后端,只是为了将酸洗移出锁定.问题是你的f()
函数(在单独的进程上运行的函数)没有做任何特殊的事情,因此它没有给 CPU 带来负载。
ngrams()
另一方面,正在执行一些“繁重”计算,但您是在主进程上调用此函数,而不是在池中。
为了让事情更清楚,请考虑这段代码......
for n in xrange(1, 100):
res = list(mapper(f, ngrams(rand_str, n)))
Run Code Online (Sandbox Code Playgroud)
...相当于这样:
for n in xrange(1, 100):
arg = ngrams(rand_str, n)
res = list(mapper(f, arg))
Run Code Online (Sandbox Code Playgroud)
另外,以下是在主进程上执行的 CPU 密集型操作:
num = 100000000
rand_list = random.sample(xrange(100000000), num)
Run Code Online (Sandbox Code Playgroud)
您应该更改代码,以便在池内调用sample()
和,或者更改代码以执行 CPU 密集型操作,这样您就会看到所有 CPU 上的负载都很高。ngrams()
f()
归档时间: |
|
查看次数: |
2183 次 |
最近记录: |