在Python中实现一种特殊类型的多处理队列

use*_*855 5 python queue multiprocessing

想象一个倒置的二叉树,其节点A,B,C,D,E,F在级别0上.节点G,H,I在级别1上,节点J在级别2上,节点K在级别3上.

等级1:G = func(A,B),H = func(C,D),I = func(E,F)

等级2:J = func(G,H)

等级3:K = func(J,I).

级别0上的每对节点必须按顺序处理,级别1上的每对节点都可以按任何顺序处理,但结果必须在下一级别必须如图所示进行处理,依此类推,直到我们最终得到最终结果结果,K.

实际问题是计算几何问题,其中固体序列融合在一起.A与B相邻,与C相邻,依此类推.所得到的A和B(G)的熔丝与C和D(H)的熔丝相邻.得到的J和I(K)的熔丝是最终结果.因此,你不能融合G和I,因为它们不相邻.如果某个级别上的节点数不是2的幂,则最终会得到一个必须进一步处理的悬空实体.

由于融合过程计算成本高且内存密集但非常平行,我想使用Python多处理包和某种形式的队列.在计算G = func(A,B)之后,我想将结果G推入队列以进行后续的J = func(G,H)计算.当队列为空时,最后的结果是最终结果.请记住,mp.queue不一定会产生结果FIFO,因为I = func(E,F)可能在H = func(C,D)之前完成

我想出了一些(坏的)解决方案,但我确信有一个优雅的解决方案超出了我的掌握.建议?

Dzi*_*inX 0

我无法为队列提出一种智能设计,但您可以轻松地用另一个进程替换队列,在我的示例中我将其称为WorkerManager. 该进程收集所有Worker进程的结果,并仅在有两个相邻数据包等待处理时启动新的工作进程。这样,您就永远不会尝试连接不相邻的结果,因此您可以忽略“级别”并在下一对准备好后立即启动计算。

from multiprocessing import Process, Queue

class Result(object):
    '''Result from start to end.'''
    def __init__(self, start, end, data):
        self.start = start
        self.end = end
        self.data = data


class Worker(Process):
    '''Joins two results into one result.'''
    def __init__(self, result_queue, pair):
        self.result_queue = result_queue
        self.pair = pair
        super(Worker, self).__init__()

    def run(self):
        left, right = self.pair
        result = Result(left.start, right.end,
                        '(%s, %s)' % (left.data, right.data))
        self.result_queue.put(result)


class WorkerManager(Process):
    '''
    Takes results from result_queue, pairs them
    and assigns workers to process them.
    Returns final result into final_queue.
    '''
    def __init__(self, result_queue, final_queue, start, end):
        self._result_queue = result_queue
        self._final_queue = final_queue
        self._start = start
        self._end = end
        self._results = []
        super(WorkerManager, self).__init__()

    def run(self):
        while True:
            result = self._result_queue.get()
            self._add_result(result)
            if self._has_final_result():
                self._final_queue.put(self._get_final_result())
                return
            pair = self._find_adjacent_pair()
            if pair:
                self._start_worker(pair)

    def _add_result(self, result):
        self._results.append(result)
        self._results.sort(key=lambda result: result.start)

    def _has_final_result(self):
        return (len(self._results) == 1
                and self._results[0].start == self._start
                and self._results[0].end == self._end)

    def _get_final_result(self):
        return self._results[0]

    def _find_adjacent_pair(self):
        for i in xrange(len(self._results) - 1):
            left, right = self._results[i], self._results[i + 1]
            if left.end == right.start:
                self._results = self._results[:i] + self._results[i + 2:]
                return left, right

    def _start_worker(self, pair):
        worker = Worker(self._result_queue, pair)
        worker.start()

if __name__ == '__main__':
    DATA = [Result(i, i + 1, str(i)) for i in xrange(6)]
    result_queue = Queue()
    final_queue = Queue()
    start = 0
    end = len(DATA)
    man = WorkerManager(result_queue, final_queue, start, end)
    man.start()
    for res in DATA:
        result_queue.put(res)
    final = final_queue.get()
    print final.start
    # 0
    print final.end
    # 6
    print final.data
    # For example:
    # (((0, 1), (2, 3)), (4, 5))
Run Code Online (Sandbox Code Playgroud)

对于我的示例,我使用了一个简单的方法Worker,它返回括号中的给定数据,并用逗号分隔,但您可以在其中放置任何计算。就我而言,最终结果是(((0, 1), (2, 3)), (4, 5))这意味着算法在计算之前计算了(0, 1)和,然后将结果与. 我希望这就是您正在寻找的。(2, 3)((0, 1), (2, 3))(4, 5)