use*_*637 5 python arrays dictionary multiprocessing python-2.7
以下代码有效,但由于传递大型数据集而非常慢。在实际实现中,创建进程和发送数据的速度几乎和计算时间一样,所以到创建第二个进程的时候,第一个进程几乎已经完成了计算,并行化?无意义。
代码与此问题中的代码相同,Multiprocessing 在 992 个整数处截止,结果建议更改工作并在下面实现。但是,我遇到了我认为的其他人的常见问题,酸洗大数据需要很长时间。
我看到使用 multiprocessing.array 传递共享内存数组的答案。我有一个大约 4000 个索引的数组,但每个索引都有一个包含 200 个键/值对的字典。每个进程只读取数据,完成一些计算,然后返回一个矩阵(4000x3)(没有字典)。
这样的答案是只读共享复制到不同的流程,Python的多处理数据?使用地图。是否可以维护以下系统并实现共享内存?是否有一种有效的方法将数据发送到带有 dict 数组的每个进程,例如将 dict 包装在某个管理器中,然后将其放入 multiprocessing.array 中?
import multiprocessing
def main():
data = {}
total = []
for j in range(0,3000):
total.append(data)
for i in range(0,200):
data[str(i)] = i
CalcManager(total,start=0,end=3000)
def CalcManager(myData,start,end):
print 'in calc manager'
#Multi processing
#Set the number of processes to use.
nprocs = 3
#Initialize the multiprocessing queue so we can get the values returned to us
tasks = multiprocessing.JoinableQueue()
result_q = multiprocessing.Queue()
#Setup an empty array to store our processes
procs = []
#Divide up the data for the set number of processes
interval = (end-start)/nprocs
new_start = start
#Create all the processes while dividing the work appropriately
for i in range(nprocs):
print 'starting processes'
new_end = new_start + interval
#Make sure we dont go past the size of the data
if new_end > end:
new_end = end
#Generate a new process and pass it the arguments
data = myData[new_start:new_end]
#Create the processes and pass the data and the result queue
p = multiprocessing.Process(target=multiProcess,args=(data,new_start,new_end,result_q,i))
procs.append(p)
p.start()
#Increment our next start to the current end
new_start = new_end+1
print 'finished starting'
#Print out the results
for i in range(nprocs):
result = result_q.get()
print result
#Joint the process to wait for all data/process to be finished
for p in procs:
p.join()
#MultiProcess Handling
def multiProcess(data,start,end,result_q,proc_num):
print 'started process'
results = []
temp = []
for i in range(0,22):
results.append(temp)
for j in range(0,3):
temp.append(j)
result_q.put(results)
return
if __name__== '__main__':
main()
Run Code Online (Sandbox Code Playgroud)
解决了
只需将字典列表放入管理器,问题就解决了。
manager=Manager()
d=manager.list(myData)
Run Code Online (Sandbox Code Playgroud)
似乎持有列表的经理也管理该列表包含的字典。启动时间有点慢,所以看起来数据还在被复制,但是它在开始时完成了一次,然后在进程内部对数据进行了切片。
import multiprocessing
import multiprocessing.sharedctypes as mt
from multiprocessing import Process, Lock, Manager
from ctypes import Structure, c_double
def main():
data = {}
total = []
for j in range(0,3000):
total.append(data)
for i in range(0,100):
data[str(i)] = i
CalcManager(total,start=0,end=500)
def CalcManager(myData,start,end):
print 'in calc manager'
print type(myData[0])
manager = Manager()
d = manager.list(myData)
#Multi processing
#Set the number of processes to use.
nprocs = 3
#Initialize the multiprocessing queue so we can get the values returned to us
tasks = multiprocessing.JoinableQueue()
result_q = multiprocessing.Queue()
#Setup an empty array to store our processes
procs = []
#Divide up the data for the set number of processes
interval = (end-start)/nprocs
new_start = start
#Create all the processes while dividing the work appropriately
for i in range(nprocs):
new_end = new_start + interval
#Make sure we dont go past the size of the data
if new_end > end:
new_end = end
#Generate a new process and pass it the arguments
data = myData[new_start:new_end]
#Create the processes and pass the data and the result queue
p = multiprocessing.Process(target=multiProcess,args=(d,new_start,new_end,result_q,i))
procs.append(p)
p.start()
#Increment our next start to the current end
new_start = new_end+1
print 'finished starting'
#Print out the results
for i in range(nprocs):
result = result_q.get()
print len(result)
#Joint the process to wait for all data/process to be finished
for p in procs:
p.join()
#MultiProcess Handling
def multiProcess(data,start,end,result_q,proc_num):
#print 'started process'
results = []
temp = []
data = data[start:end]
for i in range(0,22):
results.append(temp)
for j in range(0,3):
temp.append(j)
print len(data)
result_q.put(results)
return
if __name__ == '__main__':
main()
Run Code Online (Sandbox Code Playgroud)
您可能会看到一些改进,方法是使用 amultiprocessing.Manager将列表存储在管理器服务器中,并让每个子进程通过从该共享列表中提取项目来访问字典中的项目,而不是将切片复制到每个子进程:
def CalcManager(myData,start,end):
print 'in calc manager'
print type(myData[0])
manager = Manager()
d = manager.list(myData)
nprocs = 3
result_q = multiprocessing.Queue()
procs = []
interval = (end-start)/nprocs
new_start = start
for i in range(nprocs):
new_end = new_start + interval
if new_end > end:
new_end = end
p = multiprocessing.Process(target=multiProcess,
args=(d, new_start, new_end, result_q, i))
procs.append(p)
p.start()
#Increment our next start to the current end
new_start = new_end+1
print 'finished starting'
for i in range(nprocs):
result = result_q.get()
print len(result)
#Joint the process to wait for all data/process to be finished
for p in procs:
p.join()
Run Code Online (Sandbox Code Playgroud)
这会在创建任何工作人员之前将您的整个data列表复制到流程中。Manager返回Manager一个Proxy允许共享访问的对象list。然后,您只需将 传递Proxy给工作人员,这意味着他们的启动时间将大大减少,因为不再需要复制列表的片段data。这里的缺点是在子进程中访问列表会更慢,因为访问需要通过 IPC 进入管理器进程。这是否真正有助于性能很大程度上取决于您在list工作流程中所做的工作,但值得一试,因为它只需要很少的代码更改。