sdg*_*wer 7 python multiprocessing python-multiprocessing tqdm process-pool
我正在做一些并行处理,如下所示:
with mp.Pool(8) as tmpPool:
        results = tmpPool.starmap(my_function, inputs)
Run Code Online (Sandbox Code Playgroud)
输入看起来像:[(1,0.2312),(5,0.52)...],即一个int和float的元组。
该代码运行良好,但我似乎无法将其包裹在加载栏(tqdm)上,例如可以使用imap方法完成,如下所示:
tqdm.tqdm(mp.imap(some_function,some_inputs))
Run Code Online (Sandbox Code Playgroud)
星图也可以做到吗?
谢谢!
hon*_*boy 20
正如 Darkonaut 提到的,在撰写本文时,还没有istarmap本地可用的。如果您想避免修补,可以添加一个简单的 *_star函数作为解决方法。(这个解决方案的灵感来自于本教程。)
import tqdm
import multiprocessing
def my_function(arg1, arg2, arg3):
  return arg1 + arg2 + arg3
def my_function_star(args):
    return my_function(*args)
jobs = 4
with multiprocessing.Pool(jobs) as pool:
    args = [(i, i, i) for i in range(10000)]
    results = list(tqdm.tqdm(pool.imap(my_function_star, args), total=len(args))
Run Code Online (Sandbox Code Playgroud)
一些注意事项:
我也很喜欢科里的回答。它更干净,尽管进度条的更新似乎不像我的答案那么顺利。请注意,使用我上面发布的代码chunksize=1(默认),Corey 的答案要快几个数量级。我猜测这是由于多处理序列化造成的,因为增加chunksize(或具有更昂贵的my_function)使它们的运行时间具有可比性。
我对我的应用程序给出了答案,因为我的序列化/功能成本比非常低。
cor*_*rey 14
最简单的方法可能是在输入周围应用 tqdm(),而不是映射函数。例如:
inputs = zip(param1, param2, param3)
with mp.Pool(8) as pool:
    results = pool.starmap(my_function, tqdm.tqdm(inputs, total=len(inputs)))
Run Code Online (Sandbox Code Playgroud)
        不可能starmap(),但是添加补丁是可能的Pool.istarmap()。它基于的代码imap()。您要做的就是创建istarmap.py-file并导入模块以应用补丁,然后再进行常规的multiprocessing-imports。
# istarmap.py
import multiprocessing.pool as mpp
def istarmap(self, func, iterable, chunksize=1):
    """starmap-version of imap
    """
    if self._state != mpp.RUN:
        raise ValueError("Pool not running")
    if chunksize < 1:
        raise ValueError(
            "Chunksize must be 1+, not {0:n}".format(
                chunksize))
    task_batches = mpp.Pool._get_tasks(func, iterable, chunksize)
    result = mpp.IMapIterator(self._cache)
    self._taskqueue.put(
        (
            self._guarded_task_generation(result._job,
                                          mpp.starmapstar,
                                          task_batches),
            result._set_length
        ))
    return (item for chunk in result for item in chunk)
mpp.Pool.istarmap = istarmap
Run Code Online (Sandbox Code Playgroud)
然后在您的脚本中:
import istarmap  # import to apply patch
from multiprocessing import Pool
import tqdm    
def foo(a, b):
    for _ in range(int(50e6)):
        pass
    return a, b    
if __name__ == '__main__':
    with Pool(4) as pool:
        iterable = [(i, 'x') for i in range(10)]
        for _ in tqdm.tqdm(pool.istarmap(foo, iterable),
                           total=len(iterable)):
            pass
Run Code Online (Sandbox Code Playgroud)
        |   归档时间:  |  
           
  |  
        
|   查看次数:  |  
           340 次  |  
        
|   最近记录:  |