imap_unordered,但带有惰性扁平生成器

Ant*_*ala 7 python parallel-processing multiprocessing python-3.x python-multiprocessing

我有一个已经可以解决的问题,multiprocessing.Pool但解决方案并不是非常理想。也就是说,我拥有一组相当小的输入,每个输入都映射到一个大数据集。虽然我可以使用imap_unordered返回列表的函数,但这远非高效,因为每个大型数据集都必须作为列表返回。

我的函数可以将它们作为生成器返回以降低延迟,但我无法从子进程返回生成器。

一个虚拟示例:

import time
import multiprocessing


def generate(x):
    for j in range(x, x + 10):
        yield j
        time.sleep(1)

def wrapper(x):
    return list(generate(x))


with multiprocessing.Pool(10) as pool:
    for ready in pool.imap_unordered(wrapper, range(0, 100, 10)):
        for item in set(ready):  # to show that order does not matter:
            print(item)
Run Code Online (Sandbox Code Playgroud)

问题是,虽然整个运行现在只需要顺序运行时间的十分之一,但我仍然需要等待 10 秒才能得到第一个结果,可以通过以下方式立即获得:

def generate(x):
    for j in range(x, x + 10):
        yield j
        time.sleep(1)

for ready in map(generate, range(0, 100, 10):
    for item in set(ready):
        print(item)
Run Code Online (Sandbox Code Playgroud)

它将立即打印第一项,但需要 100 秒才能运行。

我不能做的是进一步细分问题,子流程中的生成器需要由消费者延迟评估:

def generate(x):
    for j in range(x, x + 10):
        yield j
        time.sleep(1)


with multiprocessing.Pool(10) as pool:
    for item in pool.??flatmap_unordered??(generate, range(0, 100, 10)):
        print(item)
Run Code Online (Sandbox Code Playgroud)

它会立即打印第一个项目,但运行只需要大约 10 秒!

我怎样才能做到这一点?

Mis*_*agi 2

似乎没有内置方法可以Pool增量收集生成的项目。然而,编写您自己的“平面地图”助手相当简单。

总体思路是在池进程中有一个包装器来运行迭代器并将每个单独的项目推送到队列中。在主流程中,只有一个简单的循环来获取和yield保存每个项目。

import functools
import multiprocessing


def flatmap(pool: multiprocessing.Pool, func, iterable, chunksize=None):
    """A flattening, unordered equivalent of Pool.map()"""
    # use a queue to stream individual results from processes
    queue = multiprocessing.Manager().Queue()
    # reuse task management and mapping of Pool
    pool.map_async(
        functools.partial(_flat_mappper, queue, func),
        iterable,
        chunksize,
        # callback: push a signal that everything is done
        lambda _: queue.put(None),
        lambda err: queue.put((None, err))
    )
    # yield each result as it becomes available
    while True:
        item = queue.get()
        if item is None:
            break
        result, err = item
        if err is None:
            yield result
        else:
            raise err


def _flat_mappper(queue: multiprocessing.Queue, func, *args):
    """Helper to run `(*args) -> iter` and stream results to a queue"""
    for item in func(*args):
        queue.put((item, None))
Run Code Online (Sandbox Code Playgroud)

如果需要,可以修补Pool类型本身以作为flatmap方法而不是函数。


助手flatmap可以直接用于累积生成器的结果。对于示例情况,它会在 10 秒多一点的时间内完成。

import functools
import multiprocessing


def flatmap(pool: multiprocessing.Pool, func, iterable, chunksize=None):
    """A flattening, unordered equivalent of Pool.map()"""
    # use a queue to stream individual results from processes
    queue = multiprocessing.Manager().Queue()
    # reuse task management and mapping of Pool
    pool.map_async(
        functools.partial(_flat_mappper, queue, func),
        iterable,
        chunksize,
        # callback: push a signal that everything is done
        lambda _: queue.put(None),
        lambda err: queue.put((None, err))
    )
    # yield each result as it becomes available
    while True:
        item = queue.get()
        if item is None:
            break
        result, err = item
        if err is None:
            yield result
        else:
            raise err


def _flat_mappper(queue: multiprocessing.Queue, func, *args):
    """Helper to run `(*args) -> iter` and stream results to a queue"""
    for item in func(*args):
        queue.put((item, None))
Run Code Online (Sandbox Code Playgroud)