用于多个参数的Python多处理pool.map

use*_*897 458 python multiprocessing

在Python多处理库中,是否有pool.map的变体支持多个参数?

text = "test"
def harvester(text, case):
    X = case[0]
    text+ str(X)

if __name__ == '__main__':
    pool = multiprocessing.Pool(processes=6)
    case = RAW_DATASET
    pool.map(harvester(text,case),case, 1)
    pool.close()
    pool.join()
Run Code Online (Sandbox Code Playgroud)

jfs*_*jfs 442

是否有pool.map的变种,它支持多个参数?

Python 3.3包含pool.starmap()方法:

#!/usr/bin/env python3
from functools import partial
from itertools import repeat
from multiprocessing import Pool, freeze_support

def func(a, b):
    return a + b

def main():
    a_args = [1,2,3]
    second_arg = 1
    with Pool() as pool:
        L = pool.starmap(func, [(1, 1), (2, 1), (3, 1)])
        M = pool.starmap(func, zip(a_args, repeat(second_arg)))
        N = pool.map(partial(func, b=second_arg), a_args)
        assert L == M == N

if __name__=="__main__":
    freeze_support()
    main()
Run Code Online (Sandbox Code Playgroud)

对于旧版本:

#!/usr/bin/env python2
import itertools
from multiprocessing import Pool, freeze_support

def func(a, b):
    print a, b

def func_star(a_b):
    """Convert `f([1,2])` to `f(1,2)` call."""
    return func(*a_b)

def main():
    pool = Pool()
    a_args = [1,2,3]
    second_arg = 1
    pool.map(func_star, itertools.izip(a_args, itertools.repeat(second_arg)))

if __name__=="__main__":
    freeze_support()
    main()
Run Code Online (Sandbox Code Playgroud)

产量

1 1
2 1
3 1
Run Code Online (Sandbox Code Playgroud)

请注意如何itertools.izip()itertools.repeat()在这里使用.

由于@unutbu提到的错误,你不能functools.partial()在Python 2.6上使用或类似的功能,所以func_star()应该明确定义简单的包装函数.另请参阅建议的解决方法 .uptimebox

  • F.:您可以像这样解压 `func_star` 签名中的参数元组:`def func_star((a, b))`。当然,这仅适用于固定数量的参数,但如果这是他唯一的情况,则更具可读性。 (2认同)
  • @Space_C0wb0y:`f((a,b))` 语法已在 py3k 中弃用并删除。而这里是不必要的。 (2认同)
  • 也许更 Pythonic:`func = lambda x: func(*x)` 而不是定义一个包装函数 (2认同)
  • @zthomas.nc 这个问题是关于如何支持多处理 pool.map 的多个参数。如果想知道如何通过多处理在不同的 Python 进程中调用方法而不是函数,那么请提出一个单独的问题(如果一切都失败,您始终可以创建一个类似于“func_star()”的包装方法调用的全局函数多于) (2认同)
  • 我希望有`starstarmap`。 (2认同)

sen*_*rle 302

答案取决于版本和情况.最近版本的Python(自3.3版)以来最常见的答案首先由JF Sebastian描述如下.1它使用Pool.starmap接受一系列参数元组的方法.然后它会自动从每个元组解包参数并将它们传递给给定的函数:

import multiprocessing
from itertools import product

def merge_names(a, b):
    return '{} & {}'.format(a, b)

if __name__ == '__main__':
    names = ['Brown', 'Wilson', 'Bartlett', 'Rivera', 'Molloy', 'Opie']
    with multiprocessing.Pool(processes=3) as pool:
        results = pool.starmap(merge_names, product(names, repeat=2))
    print(results)

# Output: ['Brown & Brown', 'Brown & Wilson', 'Brown & Bartlett', ...
Run Code Online (Sandbox Code Playgroud)

对于早期版本的Python,您需要编写一个辅助函数来显式解包参数.如果你想使用with,你还需要编写一个包装器来Pool转换为上下文管理器.(感谢muon指出这一点.)

import multiprocessing
from itertools import product
from contextlib import contextmanager

def merge_names(a, b):
    return '{} & {}'.format(a, b)

def merge_names_unpack(args):
    return merge_names(*args)

@contextmanager
def poolcontext(*args, **kwargs):
    pool = multiprocessing.Pool(*args, **kwargs)
    yield pool
    pool.terminate()

if __name__ == '__main__':
    names = ['Brown', 'Wilson', 'Bartlett', 'Rivera', 'Molloy', 'Opie']
    with poolcontext(processes=3) as pool:
        results = pool.map(merge_names_unpack, product(names, repeat=2))
    print(results)

# Output: ['Brown & Brown', 'Brown & Wilson', 'Brown & Bartlett', ...
Run Code Online (Sandbox Code Playgroud)

在更简单的情况下,使用固定的第二个参数,您也可以使用partial,但仅限于Python 2.7+.

import multiprocessing
from functools import partial
from contextlib import contextmanager

@contextmanager
def poolcontext(*args, **kwargs):
    pool = multiprocessing.Pool(*args, **kwargs)
    yield pool
    pool.terminate()

def merge_names(a, b):
    return '{} & {}'.format(a, b)

if __name__ == '__main__':
    names = ['Brown', 'Wilson', 'Bartlett', 'Rivera', 'Molloy', 'Opie']
    with poolcontext(processes=3) as pool:
        results = pool.map(partial(merge_names, b='Sons'), names)
    print(results)

# Output: ['Brown & Sons', 'Wilson & Sons', 'Bartlett & Sons', ...
Run Code Online (Sandbox Code Playgroud)

这很大程度上取决于他的答案,而答案本应该被接受.但由于这个问题一直停留在最顶层,因此最好为未来的读者改进它.

  • 我很困惑,你的例子中的“text”变量发生了什么?为什么“RAW_DATASET”似乎被传递了两次。我想你可能有一个错字? (2认同)
  • @muon,很好。看来“Pool”对象直到 Python 3.3 才成为上下文管理器。我添加了一个简单的包装函数,它返回一个“Pool”上下文管理器。 (2认同)

小智 129

我认为以下会更好

def multi_run_wrapper(args):
   return add(*args)
def add(x,y):
    return x+y
if __name__ == "__main__":
    from multiprocessing import Pool
    pool = Pool(4)
    results = pool.map(multi_run_wrapper,[(1,2),(2,3),(3,4)])
    print results
Run Code Online (Sandbox Code Playgroud)

产量

[3, 5, 7]
Run Code Online (Sandbox Code Playgroud)

  • 最简单的解决方案 有一个小优化; 删除包装函数并直接在`add`中解包`args`,它适用于任意数量的参数:`def add(args):(x,y)= args` (12认同)
  • 如果您想将“add”的结果存储在列表中,如何使用它? (3认同)
  • 您还可以使用“lambda”函数而不是定义“multi_run_wrapper(..)” (2认同)
  • 嗯...事实上,使用`lambda`不起作用,因为`pool.map(..)`试图挑选给定的函数 (2认同)
  • @Ahmed我喜欢它的样子,因为恕我直言,只要参数的数量不正确,方法调用就会失败。 (2认同)
  • 请在获取 **results = pool.map(...)** 后添加 ```pool.close()``` 和 ```pool.join()```,否则这可能会永远运行 (2认同)

use*_*036 49

使用的Python 3.3+pool.starmap():

from multiprocessing.dummy import Pool as ThreadPool 

def write(i, x):
    print(i, "---", x)

a = ["1","2","3"]
b = ["4","5","6"] 

pool = ThreadPool(2)
pool.starmap(write, zip(a,b)) 
pool.close() 
pool.join()
Run Code Online (Sandbox Code Playgroud)

结果:

1 --- 4
2 --- 5
3 --- 6
Run Code Online (Sandbox Code Playgroud)

如果您愿意,还可以zip()更多参数: zip(a,b,c,d,e)

如果你想要一个常量值作为参数传递,你必须使用import itertools然后zip(itertools.repeat(constant), a)例如.

  • 首先,它删除了许多不必要的东西,并明确指出它适用于python 3.3+,适用于寻找简单而干净的答案的初学者.作为一个初学者,我花了一些时间才弄明白这一点(是JFSebastians的帖子)这就是为什么我写帖子来帮助其他初学者,因为他的帖子只是说"有星图"但没有解释 - 这个是我的帖子打算.所以绝对没有理由用两个downvotes来打击我. (27认同)
  • 这是一个几乎完全重复的答案,就像 2011 年来自 @JFSebastian 的答案一样(60+ 票)。 (2认同)

zee*_*hio 25

JF Sebastian的回答中学习了itertools后,我决定更进一步,编写一个parmap包关注python-2.7和python-3.2(以及后来也可以)的并行化,提供mapstarmap函数的包,可以接受任意数量的位置参数.

安装

pip install parmap
Run Code Online (Sandbox Code Playgroud)

如何并行化:

import parmap
# If you want to do:
y = [myfunction(x, argument1, argument2) for x in mylist]
# In parallel:
y = parmap.map(myfunction, mylist, argument1, argument2)

# If you want to do:
z = [myfunction(x, y, argument1, argument2) for (x,y) in mylist]
# In parallel:
z = parmap.starmap(myfunction, mylist, argument1, argument2)

# If you want to do:
listx = [1, 2, 3, 4, 5, 6]
listy = [2, 3, 4, 5, 6, 7]
param = 3.14
param2 = 42
listz = []
for (x, y) in zip(listx, listy):
        listz.append(myfunction(x, y, param1, param2))
# In parallel:
listz = parmap.starmap(myfunction, zip(listx, listy), param1, param2)
Run Code Online (Sandbox Code Playgroud)

我已将parmap上传到PyPI和github存储库.

例如,问题可以回答如下:

import parmap

def harvester(case, text):
    X = case[0]
    text+ str(X)

if __name__ == "__main__":
    case = RAW_DATASET  # assuming this is an iterable
    parmap.map(harvester, case, "test", chunksize=1)
Run Code Online (Sandbox Code Playgroud)


Dan*_*Lee 14

#“如何接受多个参数”。

def f1(args):
    a, b, c = args[0] , args[1] , args[2]
    return a+b+c

if __name__ == "__main__":
    import multiprocessing
    pool = multiprocessing.Pool(4) 

    result1 = pool.map(f1, [ [1,2,3] ])
    print(result1)
Run Code Online (Sandbox Code Playgroud)

  • 我不明白为什么我必须一路滚动到这里才能找到最佳答案。 (12认同)
  • 整洁优雅。 (7认同)

Sac*_*hin 12

让我们保持简单明了,参考我的解决方案:

from multiprocessing.pool import ThreadPool
from functools import partial
from time import sleep
from random import randint

def dosomething(var,s):
    sleep(randint(1,5))
    print(var)
    return var + s

array = ["a", "b", "c", "d", "e"]
with ThreadPool(processes=5) as pool:
    resp_ = pool.map(partial(dosomething,s="2"), array)
    print(resp_)
Run Code Online (Sandbox Code Playgroud)

输出:

a
b
d
e
c
['a2', 'b2', 'c2', 'd2', 'e2']
Run Code Online (Sandbox Code Playgroud)


Ado*_*obe 10

另一种方法是将列表的列表传递给单参数例程:

import os
from multiprocessing import Pool

def task(args):
    print "PID =", os.getpid(), ", arg1 =", args[0], ", arg2 =", args[1]

pool = Pool()

pool.map(task, [
        [1,2],
        [3,4],
        [5,6],
        [7,8]
    ])
Run Code Online (Sandbox Code Playgroud)

然后,我们可以用自己喜欢的方法构建一个参数列表。


Mik*_*rns 9

有一个multiprocessing名为pathos的分支(注意:使用github上的版本)不需要starmap- map函数镜像python的map的API,因此map可以带有多个参数.有了pathos,你通常也可以在解释器中进行多处理,而不是卡在__main__块中.在经过一些温和的更新后,Pathos即将发布 - 主要是转换为python 3.x.

  Python 2.7.5 (default, Sep 30 2013, 20:15:49) 
  [GCC 4.2.1 (Apple Inc. build 5566)] on darwin
  Type "help", "copyright", "credits" or "license" for more information.
  >>> def func(a,b):
  ...     print a,b
  ...
  >>>
  >>> from pathos.multiprocessing import ProcessingPool    
  >>> pool = ProcessingPool(nodes=4)
  >>> pool.map(func, [1,2,3], [1,1,1])
  1 1
  2 1
  3 1
  [None, None, None]
  >>>
  >>> # also can pickle stuff like lambdas 
  >>> result = pool.map(lambda x: x**2, range(10))
  >>> result
  [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
  >>>
  >>> # also does asynchronous map
  >>> result = pool.amap(pow, [1,2,3], [4,5,6])
  >>> result.get()
  [1, 32, 729]
  >>>
  >>> # or can return a map iterator
  >>> result = pool.imap(pow, [1,2,3], [4,5,6])
  >>> result
  <processing.pool.IMapIterator object at 0x110c2ffd0>
  >>> list(result)
  [1, 32, 729]
Run Code Online (Sandbox Code Playgroud)


Alf*_* M. 8

您可以使用以下两个函数,以避免为每个新函数编写包装器:

import itertools
from multiprocessing import Pool

def universal_worker(input_pair):
    function, args = input_pair
    return function(*args)

def pool_args(function, *args):
    return zip(itertools.repeat(function), zip(*args))
Run Code Online (Sandbox Code Playgroud)

使用功能function与参数的清单arg_0,arg_1arg_2如下:

pool = Pool(n_core)
list_model = pool.map(universal_worker, pool_args(function, arg_0, arg_1, arg_2)
pool.close()
pool.join()
Run Code Online (Sandbox Code Playgroud)


Ale*_*isz 7

另一个简单的替代方法是将函数参数包装在元组中,然后包装应该在元组中传递的参数.在处理大量数据时,这可能并不理想.我相信它会为每个元组制作副本.

from multiprocessing import Pool

def f((a,b,c,d)):
    print a,b,c,d
    return a + b + c +d

if __name__ == '__main__':
    p = Pool(10)
    data = [(i+0,i+1,i+2,i+3) for i in xrange(10)]
    print(p.map(f, data))
    p.close()
    p.join()
Run Code Online (Sandbox Code Playgroud)

以某种随机顺序给出输出:

0 1 2 3
1 2 3 4
2 3 4 5
3 4 5 6
4 5 6 7
5 6 7 8
7 8 9 10
6 7 8 9
8 9 10 11
9 10 11 12
[6, 10, 14, 18, 22, 26, 30, 34, 38, 42]
Run Code Online (Sandbox Code Playgroud)


xmd*_*han 7

python2的更好解决方案:

from multiprocessing import Pool
def func((i, (a, b))):
    print i, a, b
    return a + b
pool = Pool(3)
pool.map(func, [(0,(1,2)), (1,(2,3)), (2,(3, 4))])
Run Code Online (Sandbox Code Playgroud)

2 3 4

1 2 3

0 1 2

出[]:

[3,5,7]


Syr*_*jor 6

更好的方法是使用装饰器而不是手工编写包装函数.特别是当您有许多要映射的函数时,装饰器将通过避免为每个函数编写包装来节省您的时间.通常,装饰函数不可选,但我们可能会使用functools它来绕过它.更多的讨论可以在这里找到.

这里的例子

def unpack_args(func):
    from functools import wraps
    @wraps(func)
    def wrapper(args):
        if isinstance(args, dict):
            return func(**args)
        else:
            return func(*args)
    return wrapper

@unpack_args
def func(x, y):
    return x + y
Run Code Online (Sandbox Code Playgroud)

然后你可以用压缩参数映射它

np, xlist, ylist = 2, range(10), range(10)
pool = Pool(np)
res = pool.map(func, zip(xlist, ylist))
pool.close()
pool.join()
Run Code Online (Sandbox Code Playgroud)

当然,您可能总是Pool.starmap在其他答案中提到的Python 3(> = 3.3)中使用.


cda*_*hms 5

这是另一种方法,恕我直言,它比提供的任何其他答案都更简单和优雅。

这个程序有一个函数,它接受两个参数,将它们打印出来并打印总和:

import multiprocessing

def main():

    with multiprocessing.Pool(10) as pool:
        params = [ (2, 2), (3, 3), (4, 4) ]
        pool.starmap(printSum, params)
    # end with

# end function

def printSum(num1, num2):
    mySum = num1 + num2
    print('num1 = ' + str(num1) + ', num2 = ' + str(num2) + ', sum = ' + str(mySum))
# end function

if __name__ == '__main__':
    main()
Run Code Online (Sandbox Code Playgroud)

输出是:

num1 = 2, num2 = 2, sum = 4
num1 = 3, num2 = 3, sum = 6
num1 = 4, num2 = 4, sum = 8
Run Code Online (Sandbox Code Playgroud)

有关更多信息,请参阅 python 文档:

https://docs.python.org/3/library/multiprocessing.html#module-multiprocessing.pool

特别是一定要检查starmap功能。

我使用的是 Python 3.6,我不确定这是否适用于较旧的 Python 版本

为什么文档中没有这样一个非常直接的例子,我不确定。