use*_*855 5 python queue multiprocessing
想象一个倒置的二叉树,其节点A,B,C,D,E,F在级别0上.节点G,H,I在级别1上,节点J在级别2上,节点K在级别3上.
等级1:G = func(A,B),H = func(C,D),I = func(E,F)
等级2:J = func(G,H)
等级3:K = func(J,I).
级别0上的每对节点必须按顺序处理,级别1上的每对节点都可以按任何顺序处理,但结果必须在下一级别必须如图所示进行处理,依此类推,直到我们最终得到最终结果结果,K.
实际问题是计算几何问题,其中固体序列融合在一起.A与B相邻,与C相邻,依此类推.所得到的A和B(G)的熔丝与C和D(H)的熔丝相邻.得到的J和I(K)的熔丝是最终结果.因此,你不能融合G和I,因为它们不相邻.如果某个级别上的节点数不是2的幂,则最终会得到一个必须进一步处理的悬空实体.
由于融合过程计算成本高且内存密集但非常平行,我想使用Python多处理包和某种形式的队列.在计算G = func(A,B)之后,我想将结果G推入队列以进行后续的J = func(G,H)计算.当队列为空时,最后的结果是最终结果.请记住,mp.queue不一定会产生结果FIFO,因为I = func(E,F)可能在H = func(C,D)之前完成
我想出了一些(坏的)解决方案,但我确信有一个优雅的解决方案超出了我的掌握.建议?
我无法为队列提出一种智能设计,但您可以轻松地用另一个进程替换队列,在我的示例中我将其称为WorkerManager. 该进程收集所有Worker进程的结果,并仅在有两个相邻数据包等待处理时启动新的工作进程。这样,您就永远不会尝试连接不相邻的结果,因此您可以忽略“级别”并在下一对准备好后立即启动计算。
from multiprocessing import Process, Queue
class Result(object):
'''Result from start to end.'''
def __init__(self, start, end, data):
self.start = start
self.end = end
self.data = data
class Worker(Process):
'''Joins two results into one result.'''
def __init__(self, result_queue, pair):
self.result_queue = result_queue
self.pair = pair
super(Worker, self).__init__()
def run(self):
left, right = self.pair
result = Result(left.start, right.end,
'(%s, %s)' % (left.data, right.data))
self.result_queue.put(result)
class WorkerManager(Process):
'''
Takes results from result_queue, pairs them
and assigns workers to process them.
Returns final result into final_queue.
'''
def __init__(self, result_queue, final_queue, start, end):
self._result_queue = result_queue
self._final_queue = final_queue
self._start = start
self._end = end
self._results = []
super(WorkerManager, self).__init__()
def run(self):
while True:
result = self._result_queue.get()
self._add_result(result)
if self._has_final_result():
self._final_queue.put(self._get_final_result())
return
pair = self._find_adjacent_pair()
if pair:
self._start_worker(pair)
def _add_result(self, result):
self._results.append(result)
self._results.sort(key=lambda result: result.start)
def _has_final_result(self):
return (len(self._results) == 1
and self._results[0].start == self._start
and self._results[0].end == self._end)
def _get_final_result(self):
return self._results[0]
def _find_adjacent_pair(self):
for i in xrange(len(self._results) - 1):
left, right = self._results[i], self._results[i + 1]
if left.end == right.start:
self._results = self._results[:i] + self._results[i + 2:]
return left, right
def _start_worker(self, pair):
worker = Worker(self._result_queue, pair)
worker.start()
if __name__ == '__main__':
DATA = [Result(i, i + 1, str(i)) for i in xrange(6)]
result_queue = Queue()
final_queue = Queue()
start = 0
end = len(DATA)
man = WorkerManager(result_queue, final_queue, start, end)
man.start()
for res in DATA:
result_queue.put(res)
final = final_queue.get()
print final.start
# 0
print final.end
# 6
print final.data
# For example:
# (((0, 1), (2, 3)), (4, 5))
Run Code Online (Sandbox Code Playgroud)
对于我的示例,我使用了一个简单的方法Worker,它返回括号中的给定数据,并用逗号分隔,但您可以在其中放置任何计算。就我而言,最终结果是(((0, 1), (2, 3)), (4, 5))这意味着算法在计算之前计算了(0, 1)和,然后将结果与. 我希望这就是您正在寻找的。(2, 3)((0, 1), (2, 3))(4, 5)