Kar*_*gat 28 python performance pool multiprocessing
(这个问题是关于如何使多处理.Pool()更快地运行代码.我终于解决了它,最后的解决方案可以在帖子的底部找到.)
原始问题:
我正在尝试使用Python将单词与列表中的许多其他单词进行比较,并检索最相似的单词列表.为此,我使用difflib.get_close_matches函数.我使用的是Python 2.6.5相对较新且功能强大的Windows 7笔记本电脑.
我想要的是加快比较过程,因为我的比较单词列表很长,我不得不多次重复比较过程.当我听说多处理模块时,似乎合乎逻辑的是,如果比较可以分解为工作任务并同时运行(从而利用机器功率换取更快的速度),我的比较任务将更快完成.
然而,即使在尝试了许多不同的方法,并使用了文档中显示并在论坛帖子中建议的方法之后,Pool方法似乎非常慢,比在整个列表上运行原始get_close_matches函数慢得多.一旦.我想帮助理解为什么Pool()如此缓慢以及我是否正确使用它.我只使用这个字符串比较方案作为一个例子,因为这是我能想到的最新例子,我无法理解或者让多处理工作而不是反对我.下面是difflib场景中的一个示例代码,显示了普通方法和Pooled方法之间的时间差异:
from multiprocessing import Pool
import random, time, difflib
# constants
wordlist = ["".join([random.choice([letter for letter in "abcdefghijklmnopqersty"]) for lengthofword in xrange(5)]) for nrofwords in xrange(1000000)]
mainword = "hello"
# comparison function
def findclosematch(subwordlist):
matches = difflib.get_close_matches(mainword,subwordlist,len(subwordlist),0.7)
if matches <> []:
return matches
# pool
print "pool method"
if __name__ == '__main__':
pool = Pool(processes=3)
t=time.time()
result = pool.map_async(findclosematch, wordlist, chunksize=100)
#do something with result
for r in result.get():
pass
print time.time()-t
# normal
print "normal method"
t=time.time()
# run function
result = findclosematch(wordlist)
# do something with results
for r in result:
pass
print time.time()-t
Run Code Online (Sandbox Code Playgroud)
要找到的单词是"hello",找到近似匹配的单词列表是一个100万个长的5个随机连接字符列表(仅用于说明目的).我使用了3个处理器核心和地图功能,块大小为100(我认为每个工作人员都会列出一个列表项目)(我也尝试过1000和10 000的块大小,但没有真正的区别).请注意,在两种方法中,我都在调用函数之前启动计时器,并在循环结果后立即结束计时器.正如您在下面看到的,时序结果显然有利于原始的非Pool方法:
>>>
pool method
37.1690001488 seconds
normal method
10.5329999924 seconds
>>>
Run Code Online (Sandbox Code Playgroud)
Pool方法几乎比原始方法慢4倍.我在这里缺少什么,或者可能误解了Pooling /多处理是如何工作的?我怀疑这里的部分问题可能是map函数返回None,因此向结果列表添加了数千个不必要的项,即使我只想将实际匹配返回到结果并在函数中将其写入.据我所知,这就是地图的工作原理.我听说过其他一些函数,比如filter只收集非False结果,但我不认为multiprocessing/Pool支持filter方法.在多处理模块中除了map/imap还有其他功能可以帮助我只返回我的函数返回的内容吗?应用函数更多是为了给出多个参数,据我所知.
我知道还有imap功能,我试过但没有任何时间改进.原因也就是为什么我在理解itertools模块的优点时遇到了问题,据说是"快速闪电",我注意到这是调用函数的真实情况,但根据我的经验以及从我读过的内容来看因为调用函数实际上并没有进行任何计算,所以当需要迭代结果来收集和分析它们时(没有它们就没有任何意义来调用cuntion)它需要的时间多于或有时多于一个只使用正常版本的函数直接.但我想这是另一篇文章.
无论如何,很高兴看到有人能在这里向我推进正确的方向,并且真的很感激任何帮助.我更感兴趣的是理解多处理,而不是让这个例子起作用,尽管它有助于我理解一些示例解决方案代码建议.
答案:
似乎减速与其他流程的启动时间较慢有关.我无法让.Pool()函数足够快.我使其更快的最终解决方案是手动拆分工作负载列表,使用多个.Process()而不是.Pool(),并返回队列中的解决方案.但是我想知道是否最重要的改变可能是用主要词来分解工作量而不是要比较的词,可能是因为difflib搜索功能已经如此之快.下面是同时运行5个进程的新代码,结果比运行简单代码快了大约x10(6秒对55秒).对于快速模糊查找非常有用,除了difflib已经有多快.
from multiprocessing import Process, Queue
import difflib, random, time
def f2(wordlist, mainwordlist, q):
for mainword in mainwordlist:
matches = difflib.get_close_matches(mainword,wordlist,len(wordlist),0.7)
q.put(matches)
if __name__ == '__main__':
# constants (for 50 input words, find closest match in list of 100 000 comparison words)
q = Queue()
wordlist = ["".join([random.choice([letter for letter in "abcdefghijklmnopqersty"]) for lengthofword in xrange(5)]) for nrofwords in xrange(100000)]
mainword = "hello"
mainwordlist = [mainword for each in xrange(50)]
# normal approach
t = time.time()
for mainword in mainwordlist:
matches = difflib.get_close_matches(mainword,wordlist,len(wordlist),0.7)
q.put(matches)
print time.time()-t
# split work into 5 or 10 processes
processes = 5
def splitlist(inlist, chunksize):
return [inlist[x:x+chunksize] for x in xrange(0, len(inlist), chunksize)]
print len(mainwordlist)/processes
mainwordlistsplitted = splitlist(mainwordlist, len(mainwordlist)/processes)
print "list ready"
t = time.time()
for submainwordlist in mainwordlistsplitted:
print "sub"
p = Process(target=f2, args=(wordlist,submainwordlist,q,))
p.Daemon = True
p.start()
for submainwordlist in mainwordlistsplitted:
p.join()
print time.time()-t
while True:
print q.get()
Run Code Online (Sandbox Code Playgroud)
The*_*inn 13
这些问题通常归结为以下几点:
您尝试并行化的功能不需要足够的CPU资源(即CPU时间)来合理化并行化!
当然,当您与并行时multiprocessing.Pool(8),理论上(但实际上不是)可以将速度提高8倍。
但是,请记住,这不是免费的-您以以下开销为代价获得了并行化:
task为传递给的每个chunk(大小为chunksize)创建一个iterPool.map(f, iter)task
task和task's返回值(思考 pickle.dumps())task和task's返回值(认为 pickle.loads())Locks共享内存Queues,而工作进程和父进程get()以及put()往返于这些进程Queues。os.fork()每个工作进程的一次调用成本非常昂贵。本质上,使用Pool()时需要:
iter证明上述(3)的一次性成本合理合理。要进行更深入的探索,本文和链接的演练逐步说明了如何将大量数据传递给Pool.map()(和朋友),使您陷入麻烦。
Raymond Hettinger在这里还讨论了Python并发的正确使用。
我最好的猜测是进程间通信(IPC)开销.在单进程实例中,单个进程具有单词列表.委托各种其他流程时,主流程需要不断将列表中的部分传递给其他流程.
因此,一个更好的方法可能是分离n个进程,每个进程负责加载/生成列表的1/n段并检查该单词是否在列表的那一部分.
不过,我不知道如何使用Python的多处理库做到这一点.
小智 5
我在不同的问题上遇到了与 Pool 类似的事情。我现在不确定真正的原因......
OP Karim Bahgat 编辑的答案与对我有用的解决方案相同。切换到 Process & Queue 系统后,我能够看到与机器内核数量成线性关系的加速。
这是一个例子。
def do_something(data):
return data * 2
def consumer(inQ, outQ):
while True:
try:
# get a new message
val = inQ.get()
# this is the 'TERM' signal
if val is None:
break;
# unpack the message
pos = val[0] # its helpful to pass in/out the pos in the array
data = val[1]
# process the data
ret = do_something(data)
# send the response / results
outQ.put( (pos, ret) )
except Exception, e:
print "error!", e
break
def process_data(data_list, inQ, outQ):
# send pos/data to workers
for i,dat in enumerate(data_list):
inQ.put( (i,dat) )
# process results
for i in range(len(data_list)):
ret = outQ.get()
pos = ret[0]
dat = ret[1]
data_list[pos] = dat
def main():
# initialize things
n_workers = 4
inQ = mp.Queue()
outQ = mp.Queue()
# instantiate workers
workers = [mp.Process(target=consumer, args=(inQ,outQ))
for i in range(n_workers)]
# start the workers
for w in workers:
w.start()
# gather some data
data_list = [ d for d in range(1000)]
# lets process the data a few times
for i in range(4):
process_data(data_list)
# tell all workers, no more data (one msg for each)
for i in range(n_workers):
inQ.put(None)
# join on the workers
for w in workers:
w.join()
# print out final results (i*16)
for i,dat in enumerate(data_list):
print i, dat
Run Code Online (Sandbox Code Playgroud)