Python 多处理池作为装饰器

Kar*_*arl 6 python multiprocessing python-multiprocessing

我正在处理经常需要使用 pythonmultiprocessing Pool类的代码。这会产生大量如下所示的代码:

import time
from multiprocessing import Pool
from functools import partial

def test_func(x):
    time.sleep(1)
    return x

def test_func_parallel(iterable, processes):
    p = Pool(processes=processes)
    output = p.map(test_func, iterable)
    p.close()
    return output

Run Code Online (Sandbox Code Playgroud)

这可以变得更一般:

def parallel(func, iterable, **kwargs):
    func = partial(func, **kwargs)
    p = Pool(processes=6)
    out = p.map(func, iterable)
    p.close()
    return out
Run Code Online (Sandbox Code Playgroud)

这是可行的,但向其他每个函数添加并行包装器会使代码复杂化。我真正想要的是让它作为装饰器工作。像这样的东西:

def parallel(num_processes):
    def parallel_decorator(func, num_processes=num_processes):
        def parallel_wrapper(iterable, **kwargs):
            func = partial(func, **kwargs)
            p = Pool(processes=num_processes)
            output = p.map(func, iterable)
            p.close()
            return output

        return parallel_wrapper
    return parallel_decorator
Run Code Online (Sandbox Code Playgroud)

可以按如下方式使用:

@parallel(6)
def test_func(x):
    time.sleep(1)
    return x
Run Code Online (Sandbox Code Playgroud)

这由于泡菜原因而失败

Can't pickle <function test1 at 0x117473268>: it's not the same object as __main__.test1

我已经阅读了一些关于相关问题的帖子,但它们都实现了一个解决方案,其中多处理在装饰器之外执行。有谁知道使这项工作的方法?

HTF*_*HTF 3

如果您不介意不使用装饰器的语法糖(@符号),那么这样的东西应该可以工作:

import functools
import time

from multiprocessing import Pool


def parallel(func=None, **options):
    if func is None:
        return functools.partial(parallel, **options)

    def wrapper(iterable, **kwargs):
        processes = options["processes"]

        with Pool(processes) as pool:
            result = pool.map(func, iterable)

        return result

    return wrapper


def test(i):
    time.sleep(1)
    print(f"{i}: {i * i}")

test_parallel = parallel(test, processes=6)


def main():
    test_parallel(range(10))


if __name__ == "__main__":
    main()
Run Code Online (Sandbox Code Playgroud)