我有以下设置:
do_some_processing(filename):
for line in file(filename):
if line.split(',')[0] in big_lookup_object:
# something here
if __name__ == '__main__':
big_lookup_object = marshal.load('file.bin')
pool = Pool(processes=4)
print pool.map(do_some_processing, glob.glob('*.data'))
Run Code Online (Sandbox Code Playgroud)
我正在将一些大对象加载到内存中,然后创建一个需要使用该大对象的工作池.大对象以只读方式访问,我不需要在进程之间传递它的修改.
我的问题是:加载到共享内存中的大对象,如果我在unix/c中生成进程,或者每个进程是否加载了自己的大对象副本?
更新:进一步澄清 - big_lookup_object是一个共享查找对象.我不需要拆分它并单独处理它.我需要保留一份副本.我需要分割它的工作是读取许多其他大文件,并在查找对象中查找这些大文件中的项目.
进一步更新:数据库是一个很好的解决方案,memcached可能是一个更好的解决方案,磁盘上的文件(shelve或dbm)可能更好.在这个问题中,我对内存解决方案特别感兴趣.对于最终的解决方案,我将使用hadoop,但我想看看我是否也可以拥有本地内存版本.
这应该是我的第三个也是最后一个问题,关于我试图提高我在python上做的一些统计分析的性能.我有两个版本的代码(单核与多处理),我期望通过使用多个核来获得性能,因为我希望我的代码能够解压缩/解压缩相当多的二进制字符串,遗憾的是我注意到性能实际上通过使用多个来减少核心.
我想知道是否有人对我观察到的内容有可能的解释(向下滚动到4月16日更新以获取更多信息)?
程序的关键部分是函数numpy_array(多处理中的+解码),下面的代码片段(可通过pastebin访问完整代码,下面进一步说明):
def numpy_array(data, peaks):
rt_counter=0
for x in peaks:
if rt_counter %(len(peaks)/20) == 0:
update_progress()
peak_counter=0
data_buff=base64.b64decode(x)
buff_size=len(data_buff)/4
unpack_format=">%dL" % buff_size
index=0
for y in struct.unpack(unpack_format,data_buff):
buff1=struct.pack("I",y)
buff2=struct.unpack("f",buff1)[0]
if (index % 2 == 0):
data[rt_counter][1][peak_counter][0]=float(buff2)
else:
data[rt_counter][1][peak_counter][1]=float(buff2)
peak_counter+=1
index+=1
rt_counter+=1
Run Code Online (Sandbox Code Playgroud)
多处理版本使用一组函数执行此操作,我将在下面显示键2:
def tonumpyarray(mp_arr):
return np.frombuffer(mp_arr.get_obj())
def numpy_array(shared_arr,peaks):
processors=mp.cpu_count()
with contextlib.closing(mp.Pool(processes=processors,
initializer=pool_init,
initargs=(shared_arr, ))) as pool:
chunk_size=int(len(peaks)/processors)
map_parameters=[]
for i in range(processors):
counter = i*chunk_size
chunk=peaks[i*chunk_size:(i+1)*chunk_size]
map_parameters.append((chunk, counter))
pool.map(decode,map_parameters)
def decode ((chunk, counter)):
data=tonumpyarray(shared_arr).view(
[('f0','<f4'), …Run Code Online (Sandbox Code Playgroud) 我们在Linux机器上运行了大约10个Python进程,它们都读取相同的大型数据结构(恰好是Pandas DataFrame,本质上是2D numpy矩阵).
这些进程必须尽快响应查询,并且将数据保存在磁盘上的速度根本不足以满足我们的需求.
我们真正需要的是所有进程都可以完全随机访问内存中的数据结构,因此它们可以检索执行任意计算所需的所有元素.
由于其大小,我们无法在内存中复制数据结构10次(甚至两次).
有没有办法让所有10个Python进程可以共享对内存中数据结构的随机访问?
我使用这个脚本(见最后的代码)来评估在父进程被 fork 时是否共享或复制了全局对象。
简而言之,脚本创建一个全局data对象,子进程迭代data. 该脚本还监视内存使用情况以评估对象是否在子进程中被复制。
结果如下:
data = np.ones((N,N)). 子进程中的操作:
data.sum(). 结果:data被共享(无复制)data = list(range(pow(10, 8))). 子进程中的操作:sum(data). 结果:data被复制。data = list(range(pow(10, 8))). 子进程中的操作:for x in data: pass. 结果:data被复制。由于写时复制,预期结果 1)。我对结果 2) 和 3) 有点困惑。为什么会被data复制?
脚本
import multiprocessing as mp
import numpy as np
import logging
import os
logger = mp.log_to_stderr(logging.WARNING)
def …Run Code Online (Sandbox Code Playgroud) 我注意到我可以访问子进程函数/目标之外的子进程中的函数和模块。所以我想知道当我在 python 中创建一个子进程时,它是否从当前进程中复制了所有内容?为什么我可以访问子目标之外的函数和导入的模块?
from multiprocessing import Process, Pipe
def test1():
return "hello"
def simpleChildProcess( childPipe ):
# simpleChildProcess can access test1 function
foo = test1()
childPipe.send( foo )
parentPipe, childPipe = Pipe()
childProcess = Process( target=simpleChildProcess, args=(childPipe,) )
childProcess.start()
print "Pipe Contains: %s" % parentPipe.recv()
Run Code Online (Sandbox Code Playgroud) 我有一个单线程Python程序,我想修改它以使用它运行的服务器上的所有32个处理器.正如我所设想的那样,每个工作进程都会从队列中接收其作业并将其输出提交到队列.但是,要完成其工作,每个工作进程都需要只读访问复杂的内存中数据结构 - 许多千兆字节的dicts和彼此链接的对象.在python中,有没有一种简单的方法来共享这个数据结构,而不为每个工作进程复制它?
谢谢.