将functools.lru_cache与multiprocessing.Pool结合起来

Feo*_*ran 6 python recursion caching multiprocessing

我有一个相当复杂的递归函数,有许多参数(Obara-Saika-Scheme,如果有人想知道),我想更有效地推测.作为我应用的第一步@functools.lru_cache.作为第二步,我现在想multiprocessing.Pool用于异步评估一长串输入参数.

调整functools Python文档中第二个示例并添加我拥有的工作池:

from multiprocessing import Pool
from functools import lru_cache

@lru_cache(maxsize=10)
def fibonacci(n):
    print('calculating fibonacci(%i)' %n)
    if n < 2:
        return n
    return fibonacci(n-1)+fibonacci(n-2)

with Pool(processes=4) as pool:
    for i in range(10):
        res = pool.apply_async(fibonacci, (i,))
        print(res.get())

print(fibonacci.cache_info())
Run Code Online (Sandbox Code Playgroud)

问题1

如何通过不同的工作人员共享缓存.另一个问题(如何共享缓存?)是在问类似的事情,但我无法让它工作.以下是我对此的两种失败方法.

使用multiprocessing.Pool:

from multiprocessing import Pool
from functools import lru_cache
import time

@lru_cache(maxsize=10)
def fibonacci(n):
    print('calculating fibonacci(%i)' %n)   # log whether the function gets called
    if n < 2:
        return n
    return fibonacci(n-1)+fibonacci(n-2)

res = []
with Pool(processes=4) as pool:

    # submit first task
    res.append(pool.apply_async(fibonacci, (5,)).get())

    # give fibonacci() some time to fill its cache
    time.sleep(1)

    # submit second task
    res.append(pool.apply_async(fibonacci, (3,)).get())

print(res)
Run Code Online (Sandbox Code Playgroud)

使用concurrent.futures:

import concurrent.futures
from functools import lru_cache

import time

@lru_cache(maxsize=10)
def fibonacci(n):
    print('calculating fibonacci(%i)' %n)   # log whether the function gets called
    if n < 2:
        return n
    return fibonacci(n-1)+fibonacci(n-2)

with concurrent.futures.ProcessPoolExecutor(max_workers=4) as executor:

    @lru_cache(maxsize=10)
    def fib_async(n):
        print('calculating fib_async(%i)' %n)
        if n < 2:
            return n
        return fibonacci(n-1) + fibonacci(n-2)

    res = []

    # submit first task
    res.append(executor.submit(fib_async, 5))

    # give fib_async() some time to fill its cache
    time.sleep(1)

    # submit second task
    res.append(executor.submit(fib_async, 3))


res = [e.result() for e in res]

print(res)
Run Code Online (Sandbox Code Playgroud)

两者都产生基本相同的输出,表明第二个任务重新计算fibonacci(2),尽管第一个任务已经必须计算它.如何共享缓存?

这应该可以加快速度,但是如果重复调用的时间非常严重,仍然会出现问题:worker1当前评估的调用尚未缓存,而worker2可能会开始评估同样的事情.这让我想到:

问题2

计算Fibonacci数在其递归中是相当线性的,即只有一个参数递减.我的功能更复杂,我可以使用的东西不仅可以管理已经计算的输入参数,还可以跟踪当前正在计算的内容.

要清楚:我想对递归函数进行许多并行调用,这将产生对递归函数的许多新调用.

一个棘手的问题可能是避免将一个调用直接分配给一个worker,因为当递归深度超过worker数时会导致死锁.

我可以使用这样的东西吗?或者我需要自己构建一些东西?我stumpled在multiprocessing.managersconcurrent.futures.ProcessPoolExecutor这可能是有益的.但我可以使用一些帮助来开始.

Den*_*1al 9

由于您所需的功能受 CPU 限制,因此您multiprocessing对此任务的选择是正确的。

该函数@lru_cache使用内存缓存。每个 python 进程都包含自己的内存块,因此您将生成 2 个独立的缓存(位于不同的内存空间上)。

如果您想同步这些缓存,则需要使用某种内存同步机制,例如锁等。默认lru_cache方法不进行多重处理,但您可以自己轻松实现。

只需使用共享字典(这里是一个很好的例子)来保存缓存的项目,并使用锁包装对该字典的访问(此处供参考的是python wiki 页面)。这样您就可以跨进程共享字典,同时保持访问安全。