跟踪joblib.Parallel执行的进度

Cer*_*rin 20 python parallel-processing multithreading multiprocessing joblib

有没有一种简单的方法来跟踪joblib.Parallel执行的整体进度?

我有一个由数千个作业组成的长期执行,我想跟踪并记录在数据库中.但是,要做到这一点,每当Parallel完成任务时,我都需要它来执行回调,报告剩余的剩余作业数.

我之前使用Python的stdlib multiprocessing.Pool完成了类似的任务,通过启动一个记录Pool的作业列表中待处理作业数量的线程.

看看代码,Parallel继承了Pool,所以我认为我可以使用相同的技巧,但它似乎没有使用这些列表,我也无法弄清楚如何"读取"它的内部任何其他方式的状态.

fre*_*ger 49

比 dano 和 Connor 的答案更进一步的是将整个事物包装为上下文管理器:

import contextlib
import joblib
from tqdm import tqdm    
from joblib import Parallel, delayed

@contextlib.contextmanager
def tqdm_joblib(tqdm_object):
    """Context manager to patch joblib to report into tqdm progress bar given as argument"""
    class TqdmBatchCompletionCallback(joblib.parallel.BatchCompletionCallBack):
        def __init__(self, *args, **kwargs):
            super().__init__(*args, **kwargs)

        def __call__(self, *args, **kwargs):
            tqdm_object.update(n=self.batch_size)
            return super().__call__(*args, **kwargs)

    old_batch_callback = joblib.parallel.BatchCompletionCallBack
    joblib.parallel.BatchCompletionCallBack = TqdmBatchCompletionCallback
    try:
        yield tqdm_object
    finally:
        joblib.parallel.BatchCompletionCallBack = old_batch_callback
        tqdm_object.close()    
Run Code Online (Sandbox Code Playgroud)

然后你可以像这样使用它,一旦你完成,不要留下猴子补丁代码:

with tqdm_joblib(tqdm(desc="My calculation", total=10)) as progress_bar:
    Parallel(n_jobs=16)(delayed(sqrt)(i**2) for i in range(10))
Run Code Online (Sandbox Code Playgroud)

我认为这很棒,它看起来类似于 tqdm pandas 集成。

  • 我无法编辑它,但解决方案中有一个小拼写错误,其中 joblib.parallel.BatchCompletionCallback 实际上是 BatchCompletionCallBack (请注意 CallBack 上的驼峰式) (4认同)
  • 我刚刚将此代码发布到 PyPI:https://github.com/louisabraham/tqdm_joblib 现在您可以只使用“pip install tqdm_joblib”和“from tqdm_joblib import tqdm_joblib” (3认同)
  • 我认为这不再有效 (3认同)
  • 这应该是最好的答案!谢谢 (2认同)

Jon*_*Jon 18

你为什么不能简单地使用tqdm?以下对我有用

from joblib import Parallel, delayed
from datetime import datetime
from tqdm import tqdm

def myfun(x):
    return x**2

results = Parallel(n_jobs=8)(delayed(myfun)(i) for i in tqdm(range(1000))
100%|??????????| 1000/1000 [00:00<00:00, 10563.37it/s]
Run Code Online (Sandbox Code Playgroud)

  • 我不认为这实际上是在监视正在运行的作业的完成情况,而只是监视作业的排队。如果要在“ myfun”的开始处插入“ time.sleep(1)”,您会发现tqdm进度几乎立即完成,但“结果”还需要几秒钟来填充。 (5认同)
  • 这个答案是不正确的,因为它没有回答问题。这个答案应该不被接受。 (4认同)
  • 是的,这部分是正确的。它正在跟踪作业开始与完成情况,但另一个问题是在所有作业完成后,开销也会导致延迟。完成所有任务后,需要收集结果,这可能需要很长时间。 (3认同)
  • 虽然这个答案在技术上确实是错误的,正如一些评论指出的那样,但它仍然有用:它是最简单的解决方案,比其他答案更容易做到,并且当我将它用于大量短期作业时,完成是排队后时间不会很长,所以在某些情况下已经足够了。 (3认同)
  • 我相信这个答案并没有真正回答这个问题。正如前面提到的,使用这种方法将跟踪 _queuing_ 而不是 _execution_ 本身。下面显示的带有回调的方法似乎与问题有关。 (2认同)

dan*_*ano 13

您链接到Parallel具有可选进度表的状态的文档.它是通过使用以下callback提供的关键字参数实现的multiprocessing.Pool.apply_async:

# This is inside a dispatch function
self._lock.acquire()
job = self._pool.apply_async(SafeFunction(func), args,
            kwargs, callback=CallBack(self.n_dispatched, self))
self._jobs.append(job)
self.n_dispatched += 1
Run Code Online (Sandbox Code Playgroud)

...

class CallBack(object):
    """ Callback used by parallel: it is used for progress reporting, and
        to add data to be processed
    """
    def __init__(self, index, parallel):
        self.parallel = parallel
        self.index = index

    def __call__(self, out):
        self.parallel.print_progress(self.index)
        if self.parallel._original_iterable:
            self.parallel.dispatch_next()
Run Code Online (Sandbox Code Playgroud)

这是print_progress:

def print_progress(self, index):
    elapsed_time = time.time() - self._start_time

    # This is heuristic code to print only 'verbose' times a messages
    # The challenge is that we may not know the queue length
    if self._original_iterable:
        if _verbosity_filter(index, self.verbose):
            return
        self._print('Done %3i jobs       | elapsed: %s',
                    (index + 1,
                     short_format_time(elapsed_time),
                    ))
    else:
        # We are finished dispatching
        queue_length = self.n_dispatched
        # We always display the first loop
        if not index == 0:
            # Display depending on the number of remaining items
            # A message as soon as we finish dispatching, cursor is 0
            cursor = (queue_length - index + 1
                      - self._pre_dispatch_amount)
            frequency = (queue_length // self.verbose) + 1
            is_last_item = (index + 1 == queue_length)
            if (is_last_item or cursor % frequency):
                return
        remaining_time = (elapsed_time / (index + 1) *
                    (self.n_dispatched - index - 1.))
        self._print('Done %3i out of %3i | elapsed: %s remaining: %s',
                    (index + 1,
                     queue_length,
                     short_format_time(elapsed_time),
                     short_format_time(remaining_time),
                    ))
Run Code Online (Sandbox Code Playgroud)

他们实现这一点的方式有点奇怪,说实话 - 它似乎假设任务总是按照它们启动的顺序完成.在index这去变量print_progress只是self.n_dispatched在工作实际开始的时间变量.所以推出的第一份工作总是会index以0 结束,即使说第三份工作先完成.这也意味着他们实际上并没有跟踪已完成工作的数量.所以你没有监控的实例变量.

我认为你最好的办法就是制作自己的CallBack课程和猴子补丁并行:

from math import sqrt
from collections import defaultdict
from joblib import Parallel, delayed

class CallBack(object):
    completed = defaultdict(int)

    def __init__(self, index, parallel):
        self.index = index
        self.parallel = parallel

    def __call__(self, index):
        CallBack.completed[self.parallel] += 1
        print("done with {}".format(CallBack.completed[self.parallel]))
        if self.parallel._original_iterable:
            self.parallel.dispatch_next()

import joblib.parallel
joblib.parallel.CallBack = CallBack

if __name__ == "__main__":
    print(Parallel(n_jobs=2)(delayed(sqrt)(i**2) for i in range(10)))
Run Code Online (Sandbox Code Playgroud)

输出:

done with 1
done with 2
done with 3
done with 4
done with 5
done with 6
done with 7
done with 8
done with 9
done with 10
[0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]
Run Code Online (Sandbox Code Playgroud)

这样,只要作业完成,就会调用您的回调,而不是默认的回调.


小智 10

TLDR 解决方案

使用 python 3.5 与 joblib 0.14.0 和 tq​​dm 4.46.0 配合使用。感谢 frenzykryger 提供的 contextlib 建议,感谢 dano 和 Connor 提供的猴子补丁想法。

import contextlib
import joblib
from tqdm import tqdm
from joblib import Parallel, delayed

@contextlib.contextmanager
def tqdm_joblib(tqdm_object):
    """Context manager to patch joblib to report into tqdm progress bar given as argument"""

    def tqdm_print_progress(self):
        if self.n_completed_tasks > tqdm_object.n:
            n_completed = self.n_completed_tasks - tqdm_object.n
            tqdm_object.update(n=n_completed)

    original_print_progress = joblib.parallel.Parallel.print_progress
    joblib.parallel.Parallel.print_progress = tqdm_print_progress

    try:
        yield tqdm_object
    finally:
        joblib.parallel.Parallel.print_progress = original_print_progress
        tqdm_object.close()
Run Code Online (Sandbox Code Playgroud)

您可以按照 frenzykryger 描述的相同方式使用它

import time
def some_method(wait_time):
    time.sleep(wait_time)

with tqdm_joblib(tqdm(desc="My method", total=10)) as progress_bar:
    Parallel(n_jobs=2)(delayed(some_method)(0.2) for i in range(10))
Run Code Online (Sandbox Code Playgroud)

更长的解释:

Jon 的解决方案实现起来很简单,但它只测量已调度的任务。如果任务花费的时间很长,则在等待最后一个分派的任务完成执行时,进度条将停留在 100%。

frenzykryger 的上下文管理器方法(从 dano 和 Connor 改进而来)更好,但也可以在任务完成之前BatchCompletionCallBack调用(请参阅joblib 的中间结果)。这将使我们得到超过 100% 的计数。ImmediateResult

BatchCompletionCallBack我们可以直接修补print_progress中的函数,而不是猴子修补Parallel。无论如何,已经BatchCompletionCallBack这么称呼了print_progress。如果设置了 verbose(即Parallel(n_jobs=2, verbose=100)),print_progress将打印出已完成的任务,尽管不如 tqdm 那么好。查看代码,它print_progress是一个类方法,因此它已经self.n_completed_tasks记录了我们想要的数字。我们所要做的只是将其与 joblib 进度的当前状态进行比较,并仅在存在差异时进行更新。

这是使用 python 3.5 在 joblib 0.14.0 和 tq​​dm 4.46.0 中进行测试的。


Max*_*sky 6

从 2023 年 6 月发布的 joblib v1.3.0 开始,有一种更简单的方法可以joblib.Parallel使用 tqdm 进度条进行换行(受此评论启发)。

该进度条将跟踪作业完成情况,而不是作业排队情况。以前这需要一个特殊的上下文管理器。这是一个例子:

from joblib import Parallel, delayed
from tqdm import tqdm

import time
import random

# Our example worker will sleep for a certain number of seconds.

inputs = list(range(10))
random.shuffle(inputs)

def worker(n_seconds):
    time.sleep(n_seconds)
    return n_seconds

# Run the worker jobs in parallel, with a tqdm progress bar.
# We configure Parallel to return a generator.
# Then we wrap the generator in tqdm.
# Finally, we execute everything by converting the tqdm generator to a list.

outputs = list(
    tqdm(
        # Note the new return_as argument here, which requires joblib >= 1.3:
        Parallel(return_as="generator", n_jobs=3)(
            delayed(worker)(n_seconds) for n_seconds in inputs
        ),
        total=len(inputs),
    )
)
print(outputs)
Run Code Online (Sandbox Code Playgroud)

  • 优秀的解决方案!我认为现在 joblib v1.3.0 已经发布了,这可能应该是公认的答案。它效果很好,并且比其他解决方案简单得多。 (2认同)

Con*_*ark 5

为最新版本的joblib库扩展dano的答案。内部实现有几处更改。

from joblib import Parallel, delayed
from collections import defaultdict

# patch joblib progress callback
class BatchCompletionCallBack(object):
  completed = defaultdict(int)

  def __init__(self, time, index, parallel):
    self.index = index
    self.parallel = parallel

  def __call__(self, index):
    BatchCompletionCallBack.completed[self.parallel] += 1
    print("done with {}".format(BatchCompletionCallBack.completed[self.parallel]))
    if self.parallel._original_iterator is not None:
      self.parallel.dispatch_next()

import joblib.parallel
joblib.parallel.BatchCompletionCallBack = BatchCompletionCallBack
Run Code Online (Sandbox Code Playgroud)