我有以下设置:
do_some_processing(filename):
for line in file(filename):
if line.split(',')[0] in big_lookup_object:
# something here
if __name__ == '__main__':
big_lookup_object = marshal.load('file.bin')
pool = Pool(processes=4)
print pool.map(do_some_processing, glob.glob('*.data'))
Run Code Online (Sandbox Code Playgroud)
我正在将一些大对象加载到内存中,然后创建一个需要使用该大对象的工作池.大对象以只读方式访问,我不需要在进程之间传递它的修改.
我的问题是:加载到共享内存中的大对象,如果我在unix/c中生成进程,或者每个进程是否加载了自己的大对象副本?
更新:进一步澄清 - big_lookup_object是一个共享查找对象.我不需要拆分它并单独处理它.我需要保留一份副本.我需要分割它的工作是读取许多其他大文件,并在查找对象中查找这些大文件中的项目.
进一步更新:数据库是一个很好的解决方案,memcached可能是一个更好的解决方案,磁盘上的文件(shelve或dbm)可能更好.在这个问题中,我对内存解决方案特别感兴趣.对于最终的解决方案,我将使用hadoop,但我想看看我是否也可以拥有本地内存版本.
我试图从子进程返回值,但遗憾的是这些值是不可取消的.所以我在线程模块中使用了全局变量并且成功但在使用多处理模块时无法检索子进程中完成的更新.我希望我错过了一些东西.
最终打印的结果始终与给定vars dataDV03和dataDV04的初始值相同.子进程正在更新这些全局变量,但这些全局变量在父级中保持不变.
import multiprocessing
# NOT ABLE to get python to return values in passed variables.
ants = ['DV03', 'DV04']
dataDV03 = ['', '']
dataDV04 = {'driver': '', 'status': ''}
def getDV03CclDrivers(lib): # call global variable
global dataDV03
dataDV03[1] = 1
dataDV03[0] = 0
# eval( 'CCL.' + lib + '.' + lib + '( "DV03" )' ) these are unpicklable instantiations
def getDV04CclDrivers(lib, dataDV04): # pass global variable
dataDV04['driver'] = 0 # eval( 'CCL.' + lib + '.' …Run Code Online (Sandbox Code Playgroud) 我在python中定义一个函数.程序文件名本身是abc_d.py.我不明白我是否可以再次导入同一个文件.
import numpy as np
import matplotlib.pyplot as plt
import sys
import multiprocessing
num_processor=4
pool = multiprocessing.Pool(num_processor)
def abc(data):
w=np.dot(data.reshape(25,1),data.reshape(1,25))
return w
data_final=np.array(range(100))
n=100
error=[]
k_list=[50,100,500,1000,2000]
for k in k_list:
dict_data={}
for d_set in range(num_processor):
dict_data[d_set]=data_final[int(d_set*n/4):int((d_set+1)*n/4)]
if(d_set==num_processor-1):
dict_data[d_set]=data_final[int(d_set*n/4):]
tasks = dict_data
results_w=[pool.apply_async(abc,dict_data[t]) for t in range(num_processor)]
w_f=[]
for result in results_w:
w_s=result.get()
w_f.append(w_s.tolist())
w_f=np.array(w_f)
print (w_f)
Run Code Online (Sandbox Code Playgroud)
其中tasks是带数组的字典.
错误:
任何人都可以解释错误.我对python还不太熟悉.
Process ForkPoolWorker-1:
Process ForkPoolWorker-2:
Process ForkPoolWorker-3:
Process ForkPoolWorker-4:
Traceback (most recent call last):
Traceback (most recent call last):
File …Run Code Online (Sandbox Code Playgroud) 我想做的是每个过程都使用全局变量。但是我的过程没有采用全球价值观
import multiprocessing
count = 0
def smile_detection(thread_name):
global count
for x in range(10):
count +=1
print thread_name,count
return count
x = multiprocessing.Process(target=smile_detection, args=("Thread1",))
y = multiprocessing.Process(target=smile_detection, args=("Thread2",))
x.start()
y.start()
Run Code Online (Sandbox Code Playgroud)
我正在输出像
Thread1 1
Thread1 2
.
.
Thread1 9
Thread1 10
Thread2 1
Thread2 2
.
.
Thread2 9
Thread2 10
Run Code Online (Sandbox Code Playgroud)
我想要的是
Thread1 1
Thread1 2
.
.
Thread1 9
Thread1 10
Thread2 11
Thread2 12
.
.
Thread2 19
Thread2 20
Run Code Online (Sandbox Code Playgroud)
我必须要做些什么才能做到这一点?