use*_*897 458 python multiprocessing
在Python多处理库中,是否有pool.map的变体支持多个参数?
text = "test"
def harvester(text, case):
X = case[0]
text+ str(X)
if __name__ == '__main__':
pool = multiprocessing.Pool(processes=6)
case = RAW_DATASET
pool.map(harvester(text,case),case, 1)
pool.close()
pool.join()
Run Code Online (Sandbox Code Playgroud)
jfs*_*jfs 442
是否有pool.map的变种,它支持多个参数?
Python 3.3包含pool.starmap()方法:
#!/usr/bin/env python3
from functools import partial
from itertools import repeat
from multiprocessing import Pool, freeze_support
def func(a, b):
return a + b
def main():
a_args = [1,2,3]
second_arg = 1
with Pool() as pool:
L = pool.starmap(func, [(1, 1), (2, 1), (3, 1)])
M = pool.starmap(func, zip(a_args, repeat(second_arg)))
N = pool.map(partial(func, b=second_arg), a_args)
assert L == M == N
if __name__=="__main__":
freeze_support()
main()
Run Code Online (Sandbox Code Playgroud)
对于旧版本:
#!/usr/bin/env python2
import itertools
from multiprocessing import Pool, freeze_support
def func(a, b):
print a, b
def func_star(a_b):
"""Convert `f([1,2])` to `f(1,2)` call."""
return func(*a_b)
def main():
pool = Pool()
a_args = [1,2,3]
second_arg = 1
pool.map(func_star, itertools.izip(a_args, itertools.repeat(second_arg)))
if __name__=="__main__":
freeze_support()
main()
Run Code Online (Sandbox Code Playgroud)
1 1
2 1
3 1
Run Code Online (Sandbox Code Playgroud)
请注意如何itertools.izip()和itertools.repeat()在这里使用.
由于@unutbu提到的错误,你不能functools.partial()在Python 2.6上使用或类似的功能,所以func_star()应该明确定义简单的包装函数.另请参阅建议的解决方法 .uptimebox
sen*_*rle 302
答案取决于版本和情况.最近版本的Python(自3.3版)以来最常见的答案首先由JF Sebastian描述如下.1它使用Pool.starmap接受一系列参数元组的方法.然后它会自动从每个元组解包参数并将它们传递给给定的函数:
import multiprocessing
from itertools import product
def merge_names(a, b):
return '{} & {}'.format(a, b)
if __name__ == '__main__':
names = ['Brown', 'Wilson', 'Bartlett', 'Rivera', 'Molloy', 'Opie']
with multiprocessing.Pool(processes=3) as pool:
results = pool.starmap(merge_names, product(names, repeat=2))
print(results)
# Output: ['Brown & Brown', 'Brown & Wilson', 'Brown & Bartlett', ...
Run Code Online (Sandbox Code Playgroud)
对于早期版本的Python,您需要编写一个辅助函数来显式解包参数.如果你想使用with,你还需要编写一个包装器来Pool转换为上下文管理器.(感谢muon指出这一点.)
import multiprocessing
from itertools import product
from contextlib import contextmanager
def merge_names(a, b):
return '{} & {}'.format(a, b)
def merge_names_unpack(args):
return merge_names(*args)
@contextmanager
def poolcontext(*args, **kwargs):
pool = multiprocessing.Pool(*args, **kwargs)
yield pool
pool.terminate()
if __name__ == '__main__':
names = ['Brown', 'Wilson', 'Bartlett', 'Rivera', 'Molloy', 'Opie']
with poolcontext(processes=3) as pool:
results = pool.map(merge_names_unpack, product(names, repeat=2))
print(results)
# Output: ['Brown & Brown', 'Brown & Wilson', 'Brown & Bartlett', ...
Run Code Online (Sandbox Code Playgroud)
在更简单的情况下,使用固定的第二个参数,您也可以使用partial,但仅限于Python 2.7+.
import multiprocessing
from functools import partial
from contextlib import contextmanager
@contextmanager
def poolcontext(*args, **kwargs):
pool = multiprocessing.Pool(*args, **kwargs)
yield pool
pool.terminate()
def merge_names(a, b):
return '{} & {}'.format(a, b)
if __name__ == '__main__':
names = ['Brown', 'Wilson', 'Bartlett', 'Rivera', 'Molloy', 'Opie']
with poolcontext(processes=3) as pool:
results = pool.map(partial(merge_names, b='Sons'), names)
print(results)
# Output: ['Brown & Sons', 'Wilson & Sons', 'Bartlett & Sons', ...
Run Code Online (Sandbox Code Playgroud)
这很大程度上取决于他的答案,而答案本应该被接受.但由于这个问题一直停留在最顶层,因此最好为未来的读者改进它.
小智 129
我认为以下会更好
def multi_run_wrapper(args):
return add(*args)
def add(x,y):
return x+y
if __name__ == "__main__":
from multiprocessing import Pool
pool = Pool(4)
results = pool.map(multi_run_wrapper,[(1,2),(2,3),(3,4)])
print results
Run Code Online (Sandbox Code Playgroud)
产量
[3, 5, 7]
Run Code Online (Sandbox Code Playgroud)
use*_*036 49
使用的Python 3.3+与pool.starmap():
from multiprocessing.dummy import Pool as ThreadPool
def write(i, x):
print(i, "---", x)
a = ["1","2","3"]
b = ["4","5","6"]
pool = ThreadPool(2)
pool.starmap(write, zip(a,b))
pool.close()
pool.join()
Run Code Online (Sandbox Code Playgroud)
结果:
1 --- 4
2 --- 5
3 --- 6
Run Code Online (Sandbox Code Playgroud)
如果您愿意,还可以zip()更多参数: zip(a,b,c,d,e)
如果你想要一个常量值作为参数传递,你必须使用import itertools然后zip(itertools.repeat(constant), a)例如.
zee*_*hio 25
在JF Sebastian的回答中学习了itertools后,我决定更进一步,编写一个parmap包关注python-2.7和python-3.2(以及后来也可以)的并行化,提供map和starmap函数的包,可以接受任意数量的位置参数.
安装
pip install parmap
Run Code Online (Sandbox Code Playgroud)
如何并行化:
import parmap
# If you want to do:
y = [myfunction(x, argument1, argument2) for x in mylist]
# In parallel:
y = parmap.map(myfunction, mylist, argument1, argument2)
# If you want to do:
z = [myfunction(x, y, argument1, argument2) for (x,y) in mylist]
# In parallel:
z = parmap.starmap(myfunction, mylist, argument1, argument2)
# If you want to do:
listx = [1, 2, 3, 4, 5, 6]
listy = [2, 3, 4, 5, 6, 7]
param = 3.14
param2 = 42
listz = []
for (x, y) in zip(listx, listy):
listz.append(myfunction(x, y, param1, param2))
# In parallel:
listz = parmap.starmap(myfunction, zip(listx, listy), param1, param2)
Run Code Online (Sandbox Code Playgroud)
我已将parmap上传到PyPI和github存储库.
例如,问题可以回答如下:
import parmap
def harvester(case, text):
X = case[0]
text+ str(X)
if __name__ == "__main__":
case = RAW_DATASET # assuming this is an iterable
parmap.map(harvester, case, "test", chunksize=1)
Run Code Online (Sandbox Code Playgroud)
Dan*_*Lee 14
def f1(args):
a, b, c = args[0] , args[1] , args[2]
return a+b+c
if __name__ == "__main__":
import multiprocessing
pool = multiprocessing.Pool(4)
result1 = pool.map(f1, [ [1,2,3] ])
print(result1)
Run Code Online (Sandbox Code Playgroud)
Sac*_*hin 12
让我们保持简单明了,参考我的解决方案:
from multiprocessing.pool import ThreadPool
from functools import partial
from time import sleep
from random import randint
def dosomething(var,s):
sleep(randint(1,5))
print(var)
return var + s
array = ["a", "b", "c", "d", "e"]
with ThreadPool(processes=5) as pool:
resp_ = pool.map(partial(dosomething,s="2"), array)
print(resp_)
Run Code Online (Sandbox Code Playgroud)
输出:
a
b
d
e
c
['a2', 'b2', 'c2', 'd2', 'e2']
Run Code Online (Sandbox Code Playgroud)
Ado*_*obe 10
另一种方法是将列表的列表传递给单参数例程:
import os
from multiprocessing import Pool
def task(args):
print "PID =", os.getpid(), ", arg1 =", args[0], ", arg2 =", args[1]
pool = Pool()
pool.map(task, [
[1,2],
[3,4],
[5,6],
[7,8]
])
Run Code Online (Sandbox Code Playgroud)
然后,我们可以用自己喜欢的方法构建一个参数列表。
有一个multiprocessing名为pathos的分支(注意:使用github上的版本)不需要starmap- map函数镜像python的map的API,因此map可以带有多个参数.有了pathos,你通常也可以在解释器中进行多处理,而不是卡在__main__块中.在经过一些温和的更新后,Pathos即将发布 - 主要是转换为python 3.x.
Python 2.7.5 (default, Sep 30 2013, 20:15:49)
[GCC 4.2.1 (Apple Inc. build 5566)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
>>> def func(a,b):
... print a,b
...
>>>
>>> from pathos.multiprocessing import ProcessingPool
>>> pool = ProcessingPool(nodes=4)
>>> pool.map(func, [1,2,3], [1,1,1])
1 1
2 1
3 1
[None, None, None]
>>>
>>> # also can pickle stuff like lambdas
>>> result = pool.map(lambda x: x**2, range(10))
>>> result
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
>>>
>>> # also does asynchronous map
>>> result = pool.amap(pow, [1,2,3], [4,5,6])
>>> result.get()
[1, 32, 729]
>>>
>>> # or can return a map iterator
>>> result = pool.imap(pow, [1,2,3], [4,5,6])
>>> result
<processing.pool.IMapIterator object at 0x110c2ffd0>
>>> list(result)
[1, 32, 729]
Run Code Online (Sandbox Code Playgroud)
您可以使用以下两个函数,以避免为每个新函数编写包装器:
import itertools
from multiprocessing import Pool
def universal_worker(input_pair):
function, args = input_pair
return function(*args)
def pool_args(function, *args):
return zip(itertools.repeat(function), zip(*args))
Run Code Online (Sandbox Code Playgroud)
使用功能function与参数的清单arg_0,arg_1并arg_2如下:
pool = Pool(n_core)
list_model = pool.map(universal_worker, pool_args(function, arg_0, arg_1, arg_2)
pool.close()
pool.join()
Run Code Online (Sandbox Code Playgroud)
另一个简单的替代方法是将函数参数包装在元组中,然后包装应该在元组中传递的参数.在处理大量数据时,这可能并不理想.我相信它会为每个元组制作副本.
from multiprocessing import Pool
def f((a,b,c,d)):
print a,b,c,d
return a + b + c +d
if __name__ == '__main__':
p = Pool(10)
data = [(i+0,i+1,i+2,i+3) for i in xrange(10)]
print(p.map(f, data))
p.close()
p.join()
Run Code Online (Sandbox Code Playgroud)
以某种随机顺序给出输出:
0 1 2 3
1 2 3 4
2 3 4 5
3 4 5 6
4 5 6 7
5 6 7 8
7 8 9 10
6 7 8 9
8 9 10 11
9 10 11 12
[6, 10, 14, 18, 22, 26, 30, 34, 38, 42]
Run Code Online (Sandbox Code Playgroud)
python2的更好解决方案:
from multiprocessing import Pool
def func((i, (a, b))):
print i, a, b
return a + b
pool = Pool(3)
pool.map(func, [(0,(1,2)), (1,(2,3)), (2,(3, 4))])
Run Code Online (Sandbox Code Playgroud)
2 3 4
1 2 3
0 1 2
出[]:
[3,5,7]
更好的方法是使用装饰器而不是手工编写包装函数.特别是当您有许多要映射的函数时,装饰器将通过避免为每个函数编写包装来节省您的时间.通常,装饰函数不可选,但我们可能会使用functools它来绕过它.更多的讨论可以在这里找到.
这里的例子
def unpack_args(func):
from functools import wraps
@wraps(func)
def wrapper(args):
if isinstance(args, dict):
return func(**args)
else:
return func(*args)
return wrapper
@unpack_args
def func(x, y):
return x + y
Run Code Online (Sandbox Code Playgroud)
然后你可以用压缩参数映射它
np, xlist, ylist = 2, range(10), range(10)
pool = Pool(np)
res = pool.map(func, zip(xlist, ylist))
pool.close()
pool.join()
Run Code Online (Sandbox Code Playgroud)
当然,您可能总是Pool.starmap在其他答案中提到的Python 3(> = 3.3)中使用.
这是另一种方法,恕我直言,它比提供的任何其他答案都更简单和优雅。
这个程序有一个函数,它接受两个参数,将它们打印出来并打印总和:
import multiprocessing
def main():
with multiprocessing.Pool(10) as pool:
params = [ (2, 2), (3, 3), (4, 4) ]
pool.starmap(printSum, params)
# end with
# end function
def printSum(num1, num2):
mySum = num1 + num2
print('num1 = ' + str(num1) + ', num2 = ' + str(num2) + ', sum = ' + str(mySum))
# end function
if __name__ == '__main__':
main()
Run Code Online (Sandbox Code Playgroud)
输出是:
num1 = 2, num2 = 2, sum = 4
num1 = 3, num2 = 3, sum = 6
num1 = 4, num2 = 4, sum = 8
Run Code Online (Sandbox Code Playgroud)
有关更多信息,请参阅 python 文档:
https://docs.python.org/3/library/multiprocessing.html#module-multiprocessing.pool
特别是一定要检查starmap功能。
我使用的是 Python 3.6,我不确定这是否适用于较旧的 Python 版本
为什么文档中没有这样一个非常直接的例子,我不确定。
| 归档时间: |
|
| 查看次数: |
399374 次 |
| 最近记录: |