python中的并行递归函数?

Ale*_*oks 6 python concurrency recursion multiprocessing

如何在Python中并行化递归函数?

我的功能看起来像这样:

def f(x, depth):
    if x==0:
        return ...
    else :
        return [x] + map(lambda x:f(x, depth-1), list_of_values(x))

def list_of_values(x):
    # heavy compute, pure function
Run Code Online (Sandbox Code Playgroud)

尝试并行化时multiprocessing.Pool.map,Windows会打开无数个进程并挂起.

并行化(对于单个多核机器)有什么好的(最好是简单的)方法?

这是挂起的代码:

from multiprocessing import Pool
pool = pool(processes=4) 
def f(x, depth):
    if x==0:
        return ...
    else :
        return [x] + pool.map(lambda x:f(x, depth-1), list_of_values(x))

def list_of_values(x):
    # heavy compute, pure function
Run Code Online (Sandbox Code Playgroud)

and*_*oke 5

好的,对不起这个问题.

我将回答一个稍微不同的问题,其中f()返回列表中值的总和.这是因为我从你的例子中不清楚返回类型是什么f(),并且使用整数使代码易于理解.

这很复杂,因为并行发生了两件不同的事情:

  1. 计算池中昂贵的功能
  2. 递归扩张 f()

我非常小心只使用池来计算昂贵的功能.通过这种方式,我们不会受到进程的"爆炸".但是因为这是异步的,所以我们需要推迟为工作人员在完成昂贵的功能后调用的回调做大量的工作.

更重要的是,我们需要使用倒计时锁存器,以便我们知道所有单独的子调用何时f()完成.

可能有一种更简单的方法(我很确定有,但我需要做其他事情),但也许这会让你知道什么是可能的:

from multiprocessing import Pool, Value, RawArray, RLock
from time import sleep

class Latch:

    '''A countdown latch that lets us wait for a job of "n" parts'''

    def __init__(self, n):
        self.__counter = Value('i', n)
        self.__lock = RLock()

    def decrement(self):
        with self.__lock:
            self.__counter.value -= 1
            print('dec', self.read())
        return self.read() == 0

    def read(self):
        with self.__lock:
            return self.__counter.value

    def join(self):
        while self.read():
            sleep(1)


def list_of_values(x):
    '''An expensive function'''
    print(x, ': thinking...')
    sleep(1)
    print(x, ': thought')
    return list(range(x))


pool = Pool()


def async_f(x, on_complete=None):
    '''Return the sum of the values in the expensive list'''
    if x == 0:
        on_complete(0) # no list, return 0
    else:
        n = x # need to know size of result beforehand
        latch = Latch(n) # wait for n entires to be calculated
        result = RawArray('i', n+1) # where we will assemble the map
        def delayed_map(values):
            '''This is the callback for the pool async process - it runs
               in a separate thread within this process once the
               expensive list has been calculated and orchestrates the
               mapping of f over the result.'''
            result[0] = x # first value in list is x
            for (v, i) in enumerate(values):
                def callback(fx, i=i):
                    '''This is the callback passed to f() and is called when 
                       the function completes.  If it is the last of all the
                       calls in the map then it calls on_complete() (ie another
                       instance of this function) for the calling f().'''
                    result[i+1] = fx
                    if latch.decrement(): # have completed list
                        # at this point result contains [x]+map(f, ...)
                        on_complete(sum(result)) # so return sum
                async_f(v, callback)
        # Ask worker to generate list then call delayed_map
        pool.apply_async(list_of_values, [x], callback=delayed_map)


def run():
    '''Tie into the same mechanism as above, for the final value.'''
    result = Value('i')
    latch = Latch(1)
    def final_callback(value):
        result.value = value
        latch.decrement()
    async_f(6, final_callback)
    latch.join() # wait for everything to complete
    return result.value


print(run())
Run Code Online (Sandbox Code Playgroud)

ps我正在使用python3.2和上面的丑陋是因为我们推迟了最终结果的计算(回到树上)直到以后.有可能像发电机或期货这样的东西可以简化事情.

另外,我怀疑你需要一个缓存,以避免在使用与之前相同的参数调用时不必要地重新计算昂贵的函数.

另见yaniv的答案 - python中的并行递归函数? - 这似乎是通过明确深度来反转评估顺序的另一种方法.