Python中的懒惰processpoolexecutor?

Aka*_*aba 4 python generator multiprocessing python-3.x

我要执行大量任务,并通过生成器提供结果。但是,使用ProcessPoolExecutoras_completed将贪婪地评估结果并将其全部存储在内存中。在生成器中存储了一定数量的结果之后,是否有一种方法可以阻止?

Net*_*ave 5

这样做的想法是将要处理的内容拆分为多个块,我将使用与ProcessPoolExecutor文档中几乎相同的示例:

import concurrent.futures
import math
import itertools as it

PRIMES = [
    293,
    171,
    293,
    773,
    99,
    5419,
    293,
    171,
    293,
    773,
    99,
    5419,
    293,
    171,
    293,
    773,
    99,
    5419]


def is_prime(n):
    if n % 2 == 0:
        return False

    sqrt_n = int(math.floor(math.sqrt(n)))
    for i in range(3, sqrt_n + 1, 2):
        if n % i == 0:
            return False
    return True


def main():
    with concurrent.futures.ProcessPoolExecutor() as executor:
        for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
            print('%d is prime: %s' % (number, prime))

def main_lazy():
    chunks = map(lambda x: it.islice(PRIMES, x, x+4), range(0, len(PRIMES), 4))
    with concurrent.futures.ProcessPoolExecutor() as executor:
        results = zip(PRIMES, 
                      it.chain.from_iterable(map(lambda x: executor.map(is_prime, x), 
                                                 chunks)))
        for number, prime in (next(results) for _ in range(4)):
            print('%d is prime: %s' % (number, prime))

if __name__ == "__main__":
    main_lazy()
Run Code Online (Sandbox Code Playgroud)

注意之间的差异mainmain_lazy,让我们解释这一点:

Instead of having a list of all what we want to process I split it into chunks of size 4 (it's useful to use itertools.islice), the idea is that instead of mapping with the executor the whole list we will be mapping the chunks. Then just using python3 lazy map we can map that executor call lazily to each of the chunks. So, we know that executor.map is not lazy so that chunk will be evaluated immediately when we request it, but till we don't request the other chunks the executor.map for that chunks will not be called. As you can see I'm only requesting the first 4 elements from the whole list of results, but since I also used itertools.chain it will just consume the ones from the first chunk, without calculating the rest of the iterable.

So, since you wanted to return a generator, it would be as easy as return the results from the main_lazy function, you can even abstract the chunk size (probably you would need a good function to get the propper chunks, but this is out of scope):

def main_lazy(chunk_size):
    chunks = map(lambda x: it.islice(PRIMES, x, x+chunk_size), range(0, len(PRIMES), chunk_size))
    with concurrent.futures.ProcessPoolExecutor() as executor:
        results = zip(PRIMES, 
                      it.chain.from_iterable(map(lambda x: executor.map(is_prime, x), 
                                                 chunks)))
        return results
Run Code Online (Sandbox Code Playgroud)

  • 这里的问题(如果我错了,请纠正我)是并行进程的使用不是最佳的。只有当前一个块的所有进程都完成后,您才会开始处理下一个块,因此可能会有大量空闲时间。 (2认同)