我的代码中有一个函数应该读取文件。每个文件大约 8M,但是读取速度太低,为了改进我使用多处理。可悲的是,它似乎被阻止了。我想知道有没有有什么方法可以帮助解决这个问题并提高阅读速度?
我的代码如下:
import multiprocessing as mp
import json
import os
def gainOneFile(filename):
file_from = open(filename)
json_str = file_from.read()
temp = json.loads(json_str)
print "load:",filename," len ",len(temp)
file_from.close()
return temp
def gainSortedArr(path):
arr = []
pool = mp.Pool(4)
for i in xrange(1,40):
abs_from_filename = os.path.join(path, "outputDict"+str(i))
result = pool.apply_async(gainOneFile,(abs_from_filename,))
arr.append(result.get())
pool.close()
pool.join()
arr = sorted(arr,key = lambda dic:len(dic))
return arr
Run Code Online (Sandbox Code Playgroud)
和调用函数:
whole_arr = gainSortedArr("sortKeyOut/")
Run Code Online (Sandbox Code Playgroud)
你有几个问题。首先,你没有并行化。你做:
result = pool.apply_async(gainOneFile,(abs_from_filename,))
arr.append(result.get())
Run Code Online (Sandbox Code Playgroud)
一遍又一遍,分派一个任务,然后立即调用.get()它等待它完成,然后再分派任何其他任务;实际上,您永远不会同时运行一个以上的工人。存储所有结果而不调用.get(),然后.get()稍后调用。或者只是使用Pool.map或 相关方法并为自己节省手动单个结果管理的麻烦,例如(使用imap_unordered以最大限度地减少开销,因为您只是在进行排序):
# Make generator of paths to load
paths = (os.path.join(path, "outputDict"+str(i)) for i in xrange(1, 40))
# Load them all in parallel, and sort the results by length (lambda is redundant)
arr = sorted(pool.imap_unordered(gainOneFile, paths), key=len)
Run Code Online (Sandbox Code Playgroud)
其次,multiprocessing必须pickle 和unpickle 在主进程和worker 之间发送的所有参数和返回值,并且所有这些都是通过管道发送的,这些管道会产生系统调用开销以启动。由于您的文件系统不太可能通过并行化读取获得显着的速度,因此它可能是净损失,而不是收益。
通过切换到基于线程的池,您可能会获得一些提升;更改import为import multiprocessing.dummy as mp,您将获得一个Pool在线程方面实现的版本;它们不围绕 CPython GIL 工作,但由于此代码几乎肯定是 I/O 绑定的,这几乎不重要,并且它删除了酸洗和解酸以及参与工作人员通信的 IPC。
最后,如果您在类似 UNIX 的系统上使用 Python 3.3 或更高版本,您可以让操作系统通过更积极地将文件拉入系统缓存来帮助您。如果您可以打开文件,则os.posix_fadvise在文件描述符(.fileno()在文件对象上)上使用其中一个WILLNEED或SEQUENTIAL它可能会在稍后通过在请求之前主动预取文件数据从文件读取时提高读取性能。
| 归档时间: |
|
| 查看次数: |
951 次 |
| 最近记录: |