多处理通过共享内存传递字典数组

use*_*637 5 python arrays dictionary multiprocessing python-2.7

以下代码有效,但由于传递大型数据集而非常慢。在实际实现中,创建进程和发送数据的速度几乎和计算时间一样,所以到创建第二个进程的时候,第一个进程几乎已经完成了计算,并行化?无意义。

代码与此问题中的代码相同,Multiprocessing 在 992 个整数处截止,结果建议更改工作并在下面实现。但是,我遇到了我认为的其他人的常见问题,酸洗大数据需要很长时间。

我看到使用 multiprocessing.array 传递共享内存数组的答案。我有一个大约 4000 个索引的数组,但每个索引都有一个包含 200 个键/值对的字典。每个进程只读取数据,完成一些计算,然后返回一个矩阵(4000x3)(没有字典)。

这样的答案是只读共享复制到不同的流程,Python的多处理数据?使用地图。是否可以维护以下系统并实现共享内存?是否有一种有效的方法将数据发送到带有 dict 数组的每个进程,例如将 dict 包装在某个管理器中,然后将其放入 multiprocessing.array 中?

import multiprocessing

def main():
    data = {}
    total = []
    for j in range(0,3000):
        total.append(data)
        for i in range(0,200):
            data[str(i)] = i

    CalcManager(total,start=0,end=3000)

def CalcManager(myData,start,end):
    print 'in calc manager'
    #Multi processing
    #Set the number of processes to use.  
    nprocs = 3
    #Initialize the multiprocessing queue so we can get the values returned to us
    tasks = multiprocessing.JoinableQueue()
    result_q = multiprocessing.Queue()
    #Setup an empty array to store our processes
    procs = []
    #Divide up the data for the set number of processes 
    interval = (end-start)/nprocs 
    new_start = start
    #Create all the processes while dividing the work appropriately
    for i in range(nprocs):
        print 'starting processes'
        new_end = new_start + interval
        #Make sure we dont go past the size of the data 
        if new_end > end:
            new_end = end 
        #Generate a new process and pass it the arguments 
        data = myData[new_start:new_end]
        #Create the processes and pass the data and the result queue
        p = multiprocessing.Process(target=multiProcess,args=(data,new_start,new_end,result_q,i))
        procs.append(p)
        p.start()
        #Increment our next start to the current end 
        new_start = new_end+1
    print 'finished starting'    

    #Print out the results
    for i in range(nprocs):
        result = result_q.get()
        print result

    #Joint the process to wait for all data/process to be finished
    for p in procs:
        p.join()

#MultiProcess Handling
def multiProcess(data,start,end,result_q,proc_num):
    print 'started process'
    results = []
    temp = []
    for i in range(0,22):
        results.append(temp)
        for j in range(0,3):
            temp.append(j)
    result_q.put(results)
    return

if __name__== '__main__':   
    main()
Run Code Online (Sandbox Code Playgroud)

解决了

只需将字典列表放入管理器,问题就解决了。

manager=Manager()
d=manager.list(myData)
Run Code Online (Sandbox Code Playgroud)

似乎持有列表的经理也管理该列表包含的字典。启动时间有点慢,所以看起来数据还在被复制,但是它在开始时完成了一次,然后在进程内部对数据进行了切片。

import multiprocessing
import multiprocessing.sharedctypes as mt
from multiprocessing import Process, Lock, Manager
from ctypes import Structure, c_double

def main():
    data = {}
    total = []
    for j in range(0,3000):
        total.append(data)
        for i in range(0,100):
            data[str(i)] = i

    CalcManager(total,start=0,end=500)

def CalcManager(myData,start,end):
    print 'in calc manager'
    print type(myData[0])

    manager = Manager()
    d = manager.list(myData)

    #Multi processing
    #Set the number of processes to use.  
    nprocs = 3
    #Initialize the multiprocessing queue so we can get the values returned to us
    tasks = multiprocessing.JoinableQueue()
    result_q = multiprocessing.Queue()
    #Setup an empty array to store our processes
    procs = []
    #Divide up the data for the set number of processes 
    interval = (end-start)/nprocs 
    new_start = start
    #Create all the processes while dividing the work appropriately
    for i in range(nprocs):
        new_end = new_start + interval
        #Make sure we dont go past the size of the data 
        if new_end > end:
            new_end = end 
        #Generate a new process and pass it the arguments 
        data = myData[new_start:new_end]
        #Create the processes and pass the data and the result queue
        p = multiprocessing.Process(target=multiProcess,args=(d,new_start,new_end,result_q,i))
        procs.append(p)
        p.start()
        #Increment our next start to the current end 
        new_start = new_end+1
    print 'finished starting'    

    #Print out the results
    for i in range(nprocs):
        result = result_q.get()
        print len(result)

    #Joint the process to wait for all data/process to be finished
    for p in procs:
        p.join()

#MultiProcess Handling
def multiProcess(data,start,end,result_q,proc_num):
    #print 'started process'
    results = []
    temp = []
    data = data[start:end]
    for i in range(0,22):
        results.append(temp)
        for j in range(0,3):
            temp.append(j)
    print len(data)        
    result_q.put(results)
    return

if __name__ == '__main__':
    main()
Run Code Online (Sandbox Code Playgroud)

dan*_*ano 2

您可能会看到一些改进,方法是使用 amultiprocessing.Manager将列表存储在管理器服务器中,并让每个子进程通过从该共享列表中提取项目来访问字典中的项目,而不是将切片复制到每个子进程:

def CalcManager(myData,start,end):
    print 'in calc manager'
    print type(myData[0])

    manager = Manager()
    d = manager.list(myData)

    nprocs = 3 
    result_q = multiprocessing.Queue()
    procs = []

    interval = (end-start)/nprocs 
    new_start = start

    for i in range(nprocs):
        new_end = new_start + interval
        if new_end > end:
            new_end = end 
        p = multiprocessing.Process(target=multiProcess,
                                    args=(d, new_start, new_end, result_q, i))
        procs.append(p)
        p.start()
        #Increment our next start to the current end 
        new_start = new_end+1
    print 'finished starting'        

    for i in range(nprocs):
        result = result_q.get()
        print len(result)

    #Joint the process to wait for all data/process to be finished
    for p in procs:
        p.join()
Run Code Online (Sandbox Code Playgroud)

这会在创建任何工作人员之前将您的整个data列表复制到流程中。Manager返回Manager一个Proxy允许共享访问的对象list。然后,您只需将 传递Proxy给工作人员,这意味着他们的启动时间将大大减少,因为不再需要复制列表的片段data。这里的缺点是在子进程中访问列表会更慢,因为访问需要通过 IPC 进入管理器进程。这是否真正有助于性能很大程度上取决于您在list工作流程中所做的工作,但值得一试,因为它只需要很少的代码更改。