bux*_*bux 7 python multiprocessing
我面临多处理问题.多处理堆栈溢出问题的很大一部分没有我的情况复杂,也没有回答它.有些人投票说这个问题可能重复,但我的情况有所不同,在我的情况下,共享DICT在进程之间被修改了:
我有一个程序遵循这个简化的生命周期:
A. Initialize DATA dict
B. Initialize 4 subprocess workers
C. Execute code in each workers (worker massively read DATA dict)
D. Wait workers job is done
E. Modify DATA dict content
F. Go to C
Run Code Online (Sandbox Code Playgroud)
性能是问题的一个非常重要的方面.我尝试了许多正面和负面的解决方案:
在步骤中B
,DICT
变量被分叉到子流程环境中.但经过一步E
子过程无法看到变化.
在步骤A
dict创建时multiprocessing.Manager
(请参阅此处的 "服务器进程" ).
multiprocessing.Manager
使用序列化层(我不太了解它,但它能够与网络上的进程一起工作),这对性能有害.multiprocessing.Value
并multiprocessing.Array
允许使用共享内存.我尝试用几个替换我的dict multiprocessing.Value
并且multiprocessing.Array
像这样:
用dict:
manager = multiprocessing.Manager()
dict = manager.dict()
dict['positions'] = [42, 165]
dict['on_position_42'] = 1555897
dict['on_position_165'] = 1548792
Run Code Online (Sandbox Code Playgroud)
替换为字典multiprocessing.Value
和multiprocessing.Array
:
positions = multiprocessing.Array('i', [42, 165])
on_position_42 = multiprocessing.Value('i', 1555897)
on_position_165 = multiprocessing.Value('i', 1548792)
Run Code Online (Sandbox Code Playgroud)
但在步骤E
我将需要创建新的multiprocessing.Value
和multiprocessing.Array
,例如:
positions.value = [42, 165, 322]
# create new multiprocessing.Value for 322
on_position_322 = multiprocessing.Value('i', 2258777)
Run Code Online (Sandbox Code Playgroud)
然后在步骤C
,on_position_322
工人将是未知的.如果我尝试通过管道发送multiprocessing.Value
或multiprocessing.Array
子进程,将导致"同步对象应该只通过继承在进程之间共享"错误.
multiprocessing.Value
和multiprocessing.Array
?我知道这是一种可能性,但我必须对内存数据库与multiprocessing.Manager
dict进行对比.
是否存在使用multiprocessing.Value
和multiprocessing.Array
生命周期的方式,考虑创建新的multiprocessing.Value
和multiprocessing.Array
?
或者更一般地说,考虑到这个生命周期,最有效的策略是什么?
注意:我之前尝试过另一种策略,其中步骤F
是"转到B"(在每个周期重新创建新工人).但工人的分岔环境太长了:最大的DICT
是叉子.
小智 1
由于您只是在主进程中读取字典并更新它,因此您可以使用 JoinableQueue 来传递字典并等待工作人员完成。例如
from multiprocessing import Process, JoinableQueue
import time
class Worker(Process):
def __init__(self, queue):
super(Worker, self).__init__()
self.queue = queue
def run(self):
for item in iter(self.queue.get, None):
print item
time.sleep(2)
print 'done'
self.queue.task_done()
self.queue.task_done()
if __name__ == '__main__':
request_queue = JoinableQueue()
num_workers = 4
workers = []
d = {} # A
for _ in range(num_workers):
p = Worker(request_queue) # B
workers.append(p)
p.start()
for i in range(5): # F
for _ in range(num_workers):
request_queue.put(d) # C
request_queue.join() # D
d[i] = i # E
for w in workers:
w.terminate()
w.join()
Run Code Online (Sandbox Code Playgroud)
输出:
{}
{}
{}
{}
done
done
done
done
{0: 0}
{0: 0}
{0: 0}
{0: 0}
done
done
done
done
{0: 0, 1: 1}
{0: 0, 1: 1}
{0: 0, 1: 1}
{0: 0, 1: 1}
done
done
done
done
{0: 0, 1: 1, 2: 2}
{0: 0, 1: 1, 2: 2}
{0: 0, 1: 1, 2: 2}
{0: 0, 1: 1, 2: 2}
done
done
done
done
{0: 0, 1: 1, 2: 2, 3: 3}
{0: 0, 1: 1, 2: 2, 3: 3}
{0: 0, 1: 1, 2: 2, 3: 3}
{0: 0, 1: 1, 2: 2, 3: 3}
done
done
done
done
Run Code Online (Sandbox Code Playgroud)