给定 N 个生成器,是否可以创建一个生成器在并行进程中运行它们并生成这些生成器的 zip?

cre*_*esk 6 python parallel-processing generator multiprocessing python-multiprocessing

假设我有 N 个生成器gen_1, ..., gen_N,其中每个生成器都会产生相同数量的值。我想要一个生成器gen,使其在 N 个并行进程中运行 gen_1, ..., gen_N 并产生(next(gen_1), next(gen_2), ... next(gen_N))

那是我想要的:

def gen():
   yield (next(gen_1), next(gen_2), ... next(gen_N))
Run Code Online (Sandbox Code Playgroud)

这样每个 gen_i 都在自己的进程上运行。是否有可能做到这一点?我尝试在以下虚拟示例中执行此操作但没有成功:

A = range(4)

def gen(a):
    B = ['a', 'b', 'c']
    for b in B:
        yield b + str(a)

def target(g):
    return next(g)

processes = [Process(target=target, args=(gen(a),)) for a in A]

for p in processes:
    p.start()

for p in processes:
    p.join()
Run Code Online (Sandbox Code Playgroud)

但是我得到了错误TypeError: cannot pickle 'generator' object

编辑:

我修改了@darkonaut 的答案以满足我的需要。我发布它以防你们中的一些人觉得它有用。我们首先定义几个效用函数:

from itertools import zip_longest
from typing import List, Generator


def grouper(iterable, n, fillvalue=iter([])):
    "Collect data into fixed-length chunks or blocks"
    args = [iter(iterable)] * n
    return zip_longest(*args, fillvalue=fillvalue)

def split_generators_into_batches(generators: List[Generator], n_splits):
    chunks = grouper(generators, len(generators) // n_splits + 1)

    return [zip_longest(*chunk) for chunk in chunks]
Run Code Online (Sandbox Code Playgroud)

以下类负责将任意数量的生成器拆分为 n(进程数)个批次并对其进行处理以产生所需的结果:

import multiprocessing as mp

class GeneratorParallelProcessor:
SENTINEL = 'S'

def __init__(self, generators, n_processes = 2 * mp.cpu_count()):
    self.n_processes = n_processes
    self.generators = split_generators_into_batches(list(generators), n_processes)
    self.queue = mp.SimpleQueue()
    self.barrier = mp.Barrier(n_processes + 1)
    self.sentinels = [self.SENTINEL] * n_processes

    self.processes = [
        mp.Process(target=self._worker, args=(self.barrier, self.queue, gen)) for gen in self.generators
    ]

def process(self):
    for p in self.processes:
        p.start()

    while True:
        results = list(itertools.chain(*(self.queue.get() for _ in self.generators)))
        if results != self.sentinels:
            yield results
            self.barrier.wait()
        else:
            break

    for p in self.processes:
        p.join()

def _worker(self, barrier, queue, generator):
    for x in generator:
        queue.put(x)
        barrier.wait()
    queue.put(self.SENTINEL)
Run Code Online (Sandbox Code Playgroud)

要使用它,只需执行以下操作:

parallel_processor = GeneratorParallelProcessor(generators)

    for grouped_generator in parallel_processor.process():
        output_handler(grouped_generator)
Run Code Online (Sandbox Code Playgroud)

Dar*_*aut 2

有可能通过一些努力获得这样一个“统一并行生成器(UPG) ”(尝试创造一个名字),但是正如@jasonharper已经提到的,你肯定需要在子进程中组装子生成器,因为一个正在运行的子生成器发电机不能被酸洗。

下面的模式是可重复使用的,只有生成器函数gen()是为此示例定制的。该设计用于multiprocessing.SimpleQueue将生成器结果返回给父级并 multiprocessing.Barrier用于同步。

调用Barrier.wait()将阻塞调用者(任何进程中的线程),直到指定的数量parties已调用.wait(),然后当前等待的所有线程Barrier同时释放。此处的使用确保仅在父级收到迭代的所有结果后才Barrier开始计算进一步的生成器结果,这可能有助于控制总体内存消耗。

使用的并行工作线程的数量等于您在gen_args_tuples可迭代中提供的参数元组的数量,因此gen_args_tuples=zip(range(4))将使用四个工作线程。有关更多详细信息,请参阅代码中的注释。

import multiprocessing as mp

SENTINEL = 'SENTINEL'


def gen(a):
    """Your individual generator function."""
    lst = ['a', 'b', 'c']
    for ch in lst:
        for _ in range(int(10e6)):  # some dummy computation
            pass
        yield ch + str(a)


def _worker(i, barrier, queue, gen_func, gen_args):
    for x in gen_func(*gen_args):
        print(f"WORKER-{i} sending item.")
        queue.put((i, x))
        barrier.wait()
    queue.put(SENTINEL)


def parallel_gen(gen_func, gen_args_tuples):
    """Construct and yield from parallel generators
     build from `gen_func(gen_args)`.
     """
    gen_args_tuples = list(gen_args_tuples)  # ensure list
    n_gens = len(gen_args_tuples)
    sentinels = [SENTINEL] * n_gens
    queue = mp.SimpleQueue()
    barrier = mp.Barrier(n_gens + 1)  # `parties`: + 1 for parent

    processes = [
        mp.Process(target=_worker, args=(i, barrier, queue, gen_func, args))
        for i, args in enumerate(gen_args_tuples)
    ]

    for p in processes:
        p.start()

    while True:
        results = [queue.get() for _ in range(n_gens)]
        if results != sentinels:
            results.sort()
            yield tuple(r[1] for r in results)  # sort and drop ids
            barrier.wait()  # all workers are waiting
            # already, so this will unblock immediately
        else:
            break

    for p in processes:
        p.join()


if __name__ == '__main__':

    for res in parallel_gen(gen_func=gen, gen_args_tuples=zip(range(4))):
        print(res)
Run Code Online (Sandbox Code Playgroud)

输出:

import multiprocessing as mp

SENTINEL = 'SENTINEL'


def gen(a):
    """Your individual generator function."""
    lst = ['a', 'b', 'c']
    for ch in lst:
        for _ in range(int(10e6)):  # some dummy computation
            pass
        yield ch + str(a)


def _worker(i, barrier, queue, gen_func, gen_args):
    for x in gen_func(*gen_args):
        print(f"WORKER-{i} sending item.")
        queue.put((i, x))
        barrier.wait()
    queue.put(SENTINEL)


def parallel_gen(gen_func, gen_args_tuples):
    """Construct and yield from parallel generators
     build from `gen_func(gen_args)`.
     """
    gen_args_tuples = list(gen_args_tuples)  # ensure list
    n_gens = len(gen_args_tuples)
    sentinels = [SENTINEL] * n_gens
    queue = mp.SimpleQueue()
    barrier = mp.Barrier(n_gens + 1)  # `parties`: + 1 for parent

    processes = [
        mp.Process(target=_worker, args=(i, barrier, queue, gen_func, args))
        for i, args in enumerate(gen_args_tuples)
    ]

    for p in processes:
        p.start()

    while True:
        results = [queue.get() for _ in range(n_gens)]
        if results != sentinels:
            results.sort()
            yield tuple(r[1] for r in results)  # sort and drop ids
            barrier.wait()  # all workers are waiting
            # already, so this will unblock immediately
        else:
            break

    for p in processes:
        p.join()


if __name__ == '__main__':

    for res in parallel_gen(gen_func=gen, gen_args_tuples=zip(range(4))):
        print(res)
Run Code Online (Sandbox Code Playgroud)