pet*_*ete 5 python dictionary python-multiprocessing
我想使用多处理库并行处理字典。
我的问题可以简化为这段代码:
from multiprocessing import Manager,Pool
def modify_dictionary(dictionary):
if((3,3) not in dictionary):
dictionary[(3,3)]=0.
for i in range(100):
dictionary[(3,3)] = dictionary[(3,3)]+1
return 0
if __name__ == "__main__":
manager = Manager()
dictionary = manager.dict(lock=True)
jobargs = [(dictionary) for i in range(5)]
p = Pool(5)
t = p.map(modify_dictionary,jobargs)
p.close()
p.join()
print dictionary[(3,3)]
Run Code Online (Sandbox Code Playgroud)
我创建了一个由 5 个工作人员组成的池,每个工作人员应将字典[(3,3)] 增加 100 次。因此,如果锁定过程正常工作,我预计字典[(3,3)] 在脚本末尾为 500。
然而; 我的代码中的某些内容一定是错误的,因为这不是我得到的:锁定过程似乎没有被“激活”,并且字典[(3,3)]在脚本末尾总是有一个<500的值。
你可以帮帮我吗?
问题出在这一行:
dictionary[(3,3)] = dictionary[(3,3)]+1
Run Code Online (Sandbox Code Playgroud)
这条线上发生了三件事:
但增量部分发生在任何锁定之外。
整个序列必须是原子的,并且必须在所有进程之间同步。否则,这些过程将交错发生,从而导致总数低于预期。
持有锁并增加值可确保您获得预期的 500 总数:
from multiprocessing import Manager,Pool,Lock
lock = Lock()
def modify_array(dictionary):
if((3,3) not in dictionary):
dictionary[(3,3)]=0.
for i in range(100):
with lock:
dictionary[(3,3)] = dictionary[(3,3)]+1
return 0
if __name__ == "__main__":
manager = Manager()
dictionary = manager.dict(lock=True)
jobargs = [(dictionary) for i in range(5)]
p = Pool(5)
t = p.map(modify_array,jobargs)
p.close()
p.join()
print dictionary[(3,3)]
Run Code Online (Sandbox Code Playgroud)