我们如何在与joblib的并行执行中使用tqdm?

Dro*_*man 22 python parallel-processing joblib tqdm

我想并行运行一个函数,并等待所有并行节点完成,使用joblib.就像在例子中:

from math import sqrt
from joblib import Parallel, delayed
Parallel(n_jobs=2)(delayed(sqrt)(i ** 2) for i in range(10))
Run Code Online (Sandbox Code Playgroud)

但是,我希望执行将在一个进度条中看到,就像使用tqdm一样,显示已完成的作业数量.

你会怎么做?

tyr*_*rex 29

只要把range(10)里面tqdm(...)!对你来说这似乎太好了,但它确实有效(在我的机器上):

from math import sqrt
from joblib import Parallel, delayed  
from tqdm import tqdm  
result = Parallel(n_jobs=2)(delayed(sqrt)(i ** 2) for i in tqdm(range(100000)))
Run Code Online (Sandbox Code Playgroud)

  • 这仅在进程开始时显示进度,而不是在过程完成时显示进度:`tqdm(range(10))中的i的Parallel(n_jobs = 10)(delayed(time.sleep)(i ** 2))。 (5认同)
  • 这是行不通的,`tqdm` 将立即变为 %100。 (5认同)
  • 它可以工作,但是不适用于例如字符串列表...也尝试将列表包装在`iter`中... (2认同)
  • 在另一个问题中,这个问题有一个非常优雅的[解决方案](/sf/answers/4125568821/)。 (2认同)

nie*_*akh 20

我已经创建了pqdm一个并行的 tqdm 包装器和并发期货来轻松地完成这项工作,试一试!

安装

pip install pqdm
Run Code Online (Sandbox Code Playgroud)

并使用

from pqdm.processes import pqdm
# If you want threads instead:
# from pqdm.threads import pqdm

args = [1, 2, 3, 4, 5]
# args = range(1,6) would also work

def square(a):
    return a*a

result = pqdm(args, square, n_jobs=2)
Run Code Online (Sandbox Code Playgroud)


Hen*_*nça 15

无需安装额外的软件包。您可以在contrib.concurrent中使用 tqdm 的本机支持: https://tqdm.github.io/docs/contrib.concurrent/

from tqdm.contrib.concurrent import process_map
# If you want threads instead:
# from tqdm.contrib.concurrent import thread_map
import time

args = range(5)

def square(a):
    time.sleep(a)
    return a*a

result = process_map(square, args, max_workers=2)
Run Code Online (Sandbox Code Playgroud)


use*_*430 12

修改nth 的好答案以允许动态标志使用或不使用 TQDM 并提前指定总数以便状态栏正确填充。

from tqdm.auto import tqdm
from joblib import Parallel

class ProgressParallel(Parallel):
    def __init__(self, use_tqdm=True, total=None, *args, **kwargs):
        self._use_tqdm = use_tqdm
        self._total = total
        super().__init__(*args, **kwargs)

    def __call__(self, *args, **kwargs):
        with tqdm(disable=not self._use_tqdm, total=self._total) as self._pbar:
            return Parallel.__call__(self, *args, **kwargs)

    def print_progress(self):
        if self._total is None:
            self._pbar.total = self.n_dispatched_tasks
        self._pbar.n = self.n_completed_tasks
        self._pbar.refresh()
Run Code Online (Sandbox Code Playgroud)


jth*_*jth 11

如上所述,简单地包装传递给的迭代的解决方案joblib.Parallel()并不能真正监控执行进度。相反,我建议子类化Parallel和覆盖该print_progress()方法,如下所示:

import joblib
from tqdm.auto import tqdm

class ProgressParallel(joblib.Parallel):
    def __call__(self, *args, **kwargs):
        with tqdm() as self._pbar:
            return joblib.Parallel.__call__(self, *args, **kwargs)

    def print_progress(self):
        self._pbar.total = self.n_dispatched_tasks
        self._pbar.n = self.n_completed_tasks
        self._pbar.refresh()
Run Code Online (Sandbox Code Playgroud)


Pur*_*reW 7

如果您的问题由许多部分组成,您可以将部分拆分为k子组,并行运行每个子组并更新其间的进度条,从而k更新进度.

以下示例从文档中对此进行了演示.

>>> with Parallel(n_jobs=2) as parallel:
...    accumulator = 0.
...    n_iter = 0
...    while accumulator < 1000:
...        results = parallel(delayed(sqrt)(accumulator + i ** 2)
...                           for i in range(5))
...        accumulator += sum(results)  # synchronization barrier
...        n_iter += 1
Run Code Online (Sandbox Code Playgroud)

https://pythonhosted.org/joblib/parallel.html#reusing-a-pool-of-workers

  • 这如何回答有关“单个进度条”的问题? (2认同)

Ben*_*man 7

这是可能的解决方法

def func(x):
    time.sleep(random.randint(1, 10))
    return x

def text_progessbar(seq, total=None):
    step = 1
    tick = time.time()
    while True:
        time_diff = time.time()-tick
        avg_speed = time_diff/step
        total_str = 'of %n' % total if total else ''
        print('step', step, '%.2f' % time_diff, 
              'avg: %.2f iter/sec' % avg_speed, total_str)
        step += 1
        yield next(seq)

all_bar_funcs = {
    'tqdm': lambda args: lambda x: tqdm(x, **args),
    'txt': lambda args: lambda x: text_progessbar(x, **args),
    'False': lambda args: iter,
    'None': lambda args: iter,
}

def ParallelExecutor(use_bar='tqdm', **joblib_args):
    def aprun(bar=use_bar, **tq_args):
        def tmp(op_iter):
            if str(bar) in all_bar_funcs.keys():
                bar_func = all_bar_funcs[str(bar)](tq_args)
            else:
                raise ValueError("Value %s not supported as bar type"%bar)
            return Parallel(**joblib_args)(bar_func(op_iter))
        return tmp
    return aprun

aprun = ParallelExecutor(n_jobs=5)

a1 = aprun(total=25)(delayed(func)(i ** 2 + j) for i in range(5) for j in range(5))
a2 = aprun(total=16)(delayed(func)(i ** 2 + j) for i in range(4) for j in range(4))
a2 = aprun(bar='txt')(delayed(func)(i ** 2 + j) for i in range(4) for j in range(4))
a2 = aprun(bar=None)(delayed(func)(i ** 2 + j) for i in range(4) for j in range(4))
Run Code Online (Sandbox Code Playgroud)


Lab*_*abo 6

我创建了tqdm_joblib来解决这个问题。

安装:pip install tqdm-joblib

来自自述文件:

从/sf/answers/4125568821/复制的简单片段打包以供简单重用。

from joblib import Parallel, delayed
from tqdm_joblib import tqdm_joblib

with tqdm_joblib(desc="My calculation", total=10) as progress_bar:
    Parallel(n_jobs=16)(delayed(sqrt)(i**2) for i in range(10))
Run Code Online (Sandbox Code Playgroud)