多处理:使用tqdm显示进度条

Sci*_*iPy 67 python multiprocessing progress-bar tqdm

为了使我的代码更"pythonic"和更快,我使用"多处理"和一个map函数发送它a)函数和b)迭代范围.

植入的解决方案(即直接在范围tqdm.tqdm(范围(0,30))上调用tqdm不适用于多处理(如下面的代码所示).

进度条显示从0到100%(当python读取代码?)但它不指示map函数的实际进度.

如何显示一个进度条,指示"地图"功能在哪一步?

from multiprocessing import Pool
import tqdm
import time

def _foo(my_number):
   square = my_number * my_number
   time.sleep(1)
   return square 

if __name__ == '__main__':
   p = Pool(2)
   r = p.map(_foo, tqdm.tqdm(range(0, 30)))
   p.close()
   p.join()
Run Code Online (Sandbox Code Playgroud)

欢迎任何帮助或建议......

hky*_*kyi 88

使用imap而不是map,它返回已处理值的迭代器.

from multiprocessing import Pool
import tqdm
import time

def _foo(my_number):
   square = my_number * my_number
   time.sleep(1)
   return square 

if __name__ == '__main__':
   with Pool(2) as p:
      r = list(tqdm.tqdm(p.imap(_foo, range(30)), total=30))
Run Code Online (Sandbox Code Playgroud)

  • 封闭的list()语句等待迭代器结束.total =也是必需的,因为tqdm不知道迭代将持续多长时间, (12认同)
  • 对于`starmap()`有类似的解决方案吗? (8认同)
  • 我认为这个解决方案不能正常工作。几乎一直保持在 0%,然后突然变成 100%。 (7认同)
  • 当 `p.imap` 的特定 `chunk_size` 时,行为是连线的。`tqdm` 可以更新每次迭代而不是每个块吗? (4认同)
  • `for i in tqdm.tqdm(...): pass ` 可能更直接,即 `list(tqdm.tqdm)` (3认同)
  • 这有效,但有没有其他人让它在每次迭代的换行符上连续打印进度条? (2认同)

cas*_*dcl 86

抱歉迟到了,但如果您只需要一个并发映射,我将这个功能添加到tqdm>=4.42.0

from tqdm.contrib.concurrent import process_map  # or thread_map
import time

def _foo(my_number):
   square = my_number * my_number
   time.sleep(1)
   return square 

if __name__ == '__main__':
   r = process_map(_foo, range(0, 30), max_workers=2)
Run Code Online (Sandbox Code Playgroud)

参考资料:https : //tqdm.github.io/docs/contrib.concurrent/https://github.com/tqdm/tqdm/blob/master/examples/parallel_bars.py

它支持max_workerschunksize,您也可以轻松地从 切换process_mapthread_map

  • @jlconlin @Vladimir Vargas 如果我今天在 Jupyter Notebook 中执行类似 `thread_map(fn, *iterables, tq​​dm_class=tqdm.notebook.tqdm, max_workers=12)` 的操作,我不会遇到任何问题。 (5认同)
  • 嗯.. 进度条停留在零时完成。 (4认同)
  • @Xudong `process_map` 创建、运行、关闭/连接并返回一个列表。 (3认同)
  • 这很棒!很高兴我找到了它。还有一个问题,当我在 jupyter 笔记本中使用它时,它的工作效果不是很好。我知道有一个“tqdm.notebook”,有什么办法可以将两者合并吗? (3认同)
  • 我看到讨论破解 tqdm_notebook 的问题,但是无法制定解决方案来解决 tqdm.contrib.concurrent。 (2认同)
  • 这会产生迭代参数的无条件副本,而其他似乎是写时复制。 (2认同)
  • 当我尝试这个时,我的进度条停留在零并且永远不会更新。 (2认同)

Sci*_*iPy 39

找到解决方案:小心!由于多处理,估计时间(每个循环的迭代,总时间等等......可能不稳定)但进度条工作正常.

注意:Pool的上下文管理器仅适用于Python 3.3版

from multiprocessing import Pool
import time
from tqdm import *

def _foo(my_number):
   square = my_number * my_number
   time.sleep(1)
   return square 

if __name__ == '__main__':
    with Pool(processes=2) as p:
        max_ = 30
        with tqdm(total=max_) as pbar:
            for i, _ in tqdm(enumerate(p.imap_unordered(_foo, range(0, max_)))):
                pbar.update()
Run Code Online (Sandbox Code Playgroud)

  • 这里需要第二次/内部`tqdm`调用吗? (4认同)
  • 问题中以“ r”返回的_foo(my_number)的输出如何? (3认同)
  • @shadowtalker - 它似乎没有;)。无论如何 - `imap_unordered` 是这里的关键,它提供最佳性能和最佳进度条估计。 (3认同)
  • 如何使用此解决方案检索结果? (3认同)
  • 不需要pbar.close(),它将在with终止时自动关闭 (2认同)
  • starmap()是否有类似的解决方案? (2认同)

Oli*_*ken 8

根据XaviMartínez的答案,我写了这个函数imap_unordered_bar.它的使用方式imap_unordered与显示处理栏的唯一区别相同.

from multiprocessing import Pool
import time
from tqdm import *

def imap_unordered_bar(func, args, n_processes = 2):
    p = Pool(n_processes)
    res_list = []
    with tqdm(total = len(args)) as pbar:
        for i, res in tqdm(enumerate(p.imap_unordered(func, args))):
            pbar.update()
            res_list.append(res)
    pbar.close()
    p.close()
    p.join()
    return res_list

def _foo(my_number):
    square = my_number * my_number
    time.sleep(1)
    return square 

if __name__ == '__main__':
    result = imap_unordered_bar(_foo, range(5))
Run Code Online (Sandbox Code Playgroud)

  • 这将在新行的每一步重新绘制条形图。如何更新同一行? (3认同)

Nul*_*yte 7

import multiprocessing as mp
import tqdm


iterable = ... 
num_cpu = mp.cpu_count() - 2 # dont use all cpus.


def func():
    # your logic
    ...


if __name__ == '__main__':
    with mp.Pool(num_cpu) as p:
        list(tqdm.tqdm(p.imap(func, iterable), total=len(iterable)))
Run Code Online (Sandbox Code Playgroud)


Hum*_*eed 7

对于带有 apply_async 的进度条,我们可以使用以下建议的代码:

https://github.com/tqdm/tqdm/issues/484

import time
import random
from multiprocessing import Pool
from tqdm import tqdm

def myfunc(a):
    time.sleep(random.random())
    return a ** 2

pool = Pool(2)
pbar = tqdm(total=100)

def update(*a):
    pbar.update()

for i in range(pbar.total):
    pool.apply_async(myfunc, args=(i,), callback=update)
pool.close()
pool.join()
Run Code Online (Sandbox Code Playgroud)


小智 6

您可以p_tqdm改用。

https://github.com/swansonk14/p_tqdm

from p_tqdm import p_map
import time

def _foo(my_number):
   square = my_number * my_number
   time.sleep(1)
   return square 

if __name__ == '__main__':
   r = p_map(_foo, list(range(0, 30)))
Run Code Online (Sandbox Code Playgroud)

  • `p_tqdm` 仅限于 `multiprocessing.Pool`,不适用于线程 (3认同)