ast*_*ada 3 python dictionary multiprocessing keyword-argument pathos
我正在使用pathos.multiprocessing来并行化需要使用实例方法的程序.这是一个最小的工作示例:
import time
import numpy as np
from pathos.multiprocessing import Pool, ProcessingPool, ThreadingPool
class dummy(object):
def __init__(self, arg, key1=None, key2=-11):
np.random.seed(arg)
randnum = np.random.randint(0, 5)
print 'Sleeping {} seconds'.format(randnum)
time.sleep(randnum)
self.value = arg
self.more1 = key1
self.more2 = key2
args = [0, 10, 20, 33, 82]
keys = ['key1', 'key2']
k1val = ['car', 'borg', 'syria', 'aurora', 'libera']
k2val = ['a', 'b', 'c', 'd', 'e']
allks = [dict(zip(keys, [k1val[i], k2val[i]])) for i in range(5)]
pool = ThreadingPool(4)
result = pool.map(dummy, args, k1val, k2val)
print [[r.value, r.more1, r.more2] for r in result]
Run Code Online (Sandbox Code Playgroud)
打印结果(如预期):
Sleeping 4 seconds
Sleeping 1 seconds
Sleeping 3 seconds
Sleeping 4 seconds
Sleeping 3 seconds
[[0, 'car', 'a'], [10, 'borg', 'b'], [20, 'syria', 'c'], [33, 'aurora', 'd'], [82, 'libera', 'e']]
Run Code Online (Sandbox Code Playgroud)
但是,在这个调用map
最后两个参数的顺序很重要,如果我这样做:
result2 = pool.map(dummy, args, k2val, k1val)
Run Code Online (Sandbox Code Playgroud)
我获得:
[[0, 'a', 'car'], [10, 'b', 'borg'], [20, 'c', 'syria'], [33, 'd', 'aurora'], [82, 'e', 'libera']]
Run Code Online (Sandbox Code Playgroud)
而我想获得与第一个结果相同的结果.行为apply_async
kwds
与标准模块中的行为相同multiprocessing
,即传递字典列表,其中每个字典中的键是关键字名称,项是关键字参数(请参阅参考资料allks
).请注意,标准模块multiprocessing
不能使用实例方法,因此甚至不满足最低要求.
暂时这将是:result = pool.map(dummy,args,kwds = allks)#这不起作用
我是pathos
作者.是的,你遇到了一些我知道需要一点工作的东西.目前,map
和pipe
(即apply
)方法来自ProcessPool
,ThreadPool
而且ParallelPool
不能采取kwds
- 你必须将它们作为传递args
.但是,如果使用_ProcessPool
或_ThreadPool
,那么你可以通过kwds
自己map
和apply
方法.pathos.pools
以下划线开头的池实际上直接来自multiprocess
,因此它们具有multiprocessing
与之相同的API (但具有更好的序列化,因此可以传递类方法等).
>>> from pathos.pools import _ProcessPool
>>> from multiprocess.pool import Pool
>>> Pool is _ProcessPool
True
Run Code Online (Sandbox Code Playgroud)
因此,对原始代码的编辑看起来像这样(来自OP的建议编辑):
>>> from pathos.pools import _ThreadPool
>>> pool = _ThreadPool(4)
>>>
[…]
>>> result = []
>>> def callback(x):
>>> result.append(x)
>>>
>>> for a, k in zip(args, allks):
>>> pool.apply_async(dummy, args=(a,), kwds=k, callback=callback)
>>>
>>> pool.close()
>>> pool.join()
Run Code Online (Sandbox Code Playgroud)