大熊猫运营期间的进度指标(python)

cwh*_*and 118 python ipython pandas

我经常对超过1500万行的数据帧执行pandas操作,我很乐意访问特定操作的进度指示器.

是否存在基于文本的pandas split-apply-combine操作的进度指示器?

例如,在以下情况中:

df_users.groupby(['userID', 'requestDate']).apply(feature_rollup)
Run Code Online (Sandbox Code Playgroud)

where feature_rollup是一个涉及多少DF列的函数,并通过各种方法创建新的用户列.对于大型数据帧,这些操作可能需要一段时间,因此我想知道是否可以在iPython笔记本中使用基于文本的输出来更新我的进度.

到目前为止,我已经尝试了Python的规范循环进度指示器,但它们没有以任何有意义的方式与pandas交互.

我希望在pandas库/文档中我忽略了一些让人们知道split-apply-combine进度的东西.一个简单的实现可能会查看apply函数工作的数据帧子集的总数,并将进度报告为这些子集的已完成部分.

这可能是需要添加到库中的吗?

cas*_*dcl 198

由于受欢迎的需求,tqdm增加了支持pandas.与其他答案不同,这不会明显减慢熊猫速度 - 这是一个例子DataFrameGroupBy.progress_apply:

import pandas as pd
import numpy as np
from tqdm import tqdm
# from tqdm.auto import tqdm  # for notebooks

df = pd.DataFrame(np.random.randint(0, int(1e8), (10000, 1000)))

# Create and register a new `tqdm` instance with `pandas`
# (can use tqdm_gui, optional kwargs, etc.)
tqdm.pandas()

# Now you can use `progress_apply` instead of `apply`
df.groupby(0).progress_apply(lambda x: x**2)
Run Code Online (Sandbox Code Playgroud)

如果您对它的工作方式感兴趣(以及如何为自己的回调修改它),请参阅github上示例,pypi完整文档,或导入模块并运行help(tqdm).

编辑


要直接回答原始问题,请替换:

df_users.groupby(['userID', 'requestDate']).apply(feature_rollup)
Run Code Online (Sandbox Code Playgroud)

有:

from tqdm import tqdm
tqdm.pandas()
df_users.groupby(['userID', 'requestDate']).progress_apply(feature_rollup)
Run Code Online (Sandbox Code Playgroud)

注意:tqdm <= v4.8:对于低于4.8的tqdm版本,而不是tqdm.pandas()你必须这样做:

from tqdm import tqdm, tqdm_pandas
tqdm_pandas(tqdm())
Run Code Online (Sandbox Code Playgroud)

  • 顺便说一句,如果你使用Jupyter笔记本,你也可以使用tqdm_notebooks来获得更漂亮的吧.和pandas一起,你现在需要像`来自tqdm import tqdm_notebook;来实例化它.tqdm_notebook().pandas(*args,**kwargs)`[见这里](http://stackoverflow.com/questions/40476680/how-to-use-tqdm-with-pandas-in-a-jupyter-notebook ) (5认同)
  • `tqdm`实际上是为最初的普通迭代创建的:`来自tqdm import tqdm; 对于我在tqdm(范围(int(1e8))):pass`熊猫支持是我最近的一个黑客:) (4认同)
  • 从版本4.8.1开始 - 改为使用tqdm.pandas().https://github.com/tqdm/tqdm/commit/ad9abcc63330ad5d22fd8fca83dcec3e9d37afe6 (2认同)
  • 这太棒了。谢谢 (2认同)

And*_*den 14

调整Jeff的答案(并将其作为可重复使用的函数).

def logged_apply(g, func, *args, **kwargs):
    step_percentage = 100. / len(g)
    import sys
    sys.stdout.write('apply progress:   0%')
    sys.stdout.flush()

    def logging_decorator(func):
        def wrapper(*args, **kwargs):
            progress = wrapper.count * step_percentage
            sys.stdout.write('\033[D \033[D' * 4 + format(progress, '3.0f') + '%')
            sys.stdout.flush()
            wrapper.count += 1
            return func(*args, **kwargs)
        wrapper.count = 0
        return wrapper

    logged_func = logging_decorator(func)
    res = g.apply(logged_func, *args, **kwargs)
    sys.stdout.write('\033[D \033[D' * 4 + format(100., '3.0f') + '%' + '\n')
    sys.stdout.flush()
    return res
Run Code Online (Sandbox Code Playgroud)

注意:应用进度百分比内联更新.如果您的功能stdouts然后这将无法正常工作.

In [11]: g = df_users.groupby(['userID', 'requestDate'])

In [12]: f = feature_rollup

In [13]: logged_apply(g, f)
apply progress: 100%
Out[13]: 
...
Run Code Online (Sandbox Code Playgroud)

像往常一样,您可以将此作为方法添加到groupby对象:

from pandas.core.groupby import DataFrameGroupBy
DataFrameGroupBy.logged_apply = logged_apply

In [21]: g.logged_apply(f)
apply progress: 100%
Out[21]: 
...
Run Code Online (Sandbox Code Playgroud)

正如评论中所提到的,这不是核心熊猫有兴趣实现的功能.但是python允许你为许多pandas对象/方法创建这些(这样做会有相当多的工作......虽然你应该能够概括这种方法).


mor*_*ork 12

对于希望将 tqdm 应用于其自定义并行 Pandas-apply 代码的任何人。

(多年来,我尝试了一些用于并行化的库,但我从未找到 100% 并行化的解决方案,主要是针对 apply 函数,而且我总是不得不回来寻找我的“手动”代码。)

df_multi_core - 这就是你所说的。它接受:

  1. 你的 df 对象
  2. 您要调用的函数名称
  3. 可以在其上执行函数的列子集(有助于减少时间/内存)
  4. 并行运行的作业数量(-1 或省略所有内核)
  5. df 函数接受的任何其他 kwargs(如“轴”)

_df_split - 这是一个内部辅助函数,必须全局定位到正在运行的模块(Pool.map 是“位置相关”),否则我会在内部定位它..

这是我的要点中的代码(我将在那里添加更多的熊猫功能测试):

import pandas as pd
import numpy as np
import multiprocessing
from functools import partial

def _df_split(tup_arg, **kwargs):
    split_ind, df_split, df_f_name = tup_arg
    return (split_ind, getattr(df_split, df_f_name)(**kwargs))

def df_multi_core(df, df_f_name, subset=None, njobs=-1, **kwargs):
    if njobs == -1:
        njobs = multiprocessing.cpu_count()
    pool = multiprocessing.Pool(processes=njobs)

    try:
        splits = np.array_split(df[subset], njobs)
    except ValueError:
        splits = np.array_split(df, njobs)

    pool_data = [(split_ind, df_split, df_f_name) for split_ind, df_split in enumerate(splits)]
    results = pool.map(partial(_df_split, **kwargs), pool_data)
    pool.close()
    pool.join()
    results = sorted(results, key=lambda x:x[0])
    results = pd.concat([split[1] for split in results])
    return results
Run Code Online (Sandbox Code Playgroud)

Bellow 是使用 tqdm "progress_apply"进行并行化应用的测试代码。

from time import time
from tqdm import tqdm
tqdm.pandas()

if __name__ == '__main__': 
    sep = '-' * 50

    # tqdm progress_apply test      
    def apply_f(row):
        return row['c1'] + 0.1
    N = 1000000
    np.random.seed(0)
    df = pd.DataFrame({'c1': np.arange(N), 'c2': np.arange(N)})

    print('testing pandas apply on {}\n{}'.format(df.shape, sep))
    t1 = time()
    res = df.progress_apply(apply_f, axis=1)
    t2 = time()
    print('result random sample\n{}'.format(res.sample(n=3, random_state=0)))
    print('time for native implementation {}\n{}'.format(round(t2 - t1, 2), sep))

    t3 = time()
    # res = df_multi_core(df=df, df_f_name='apply', subset=['c1'], njobs=-1, func=apply_f, axis=1)
    res = df_multi_core(df=df, df_f_name='progress_apply', subset=['c1'], njobs=-1, func=apply_f, axis=1)
    t4 = time()
    print('result random sample\n{}'.format(res.sample(n=3, random_state=0)))
    print('time for multi core implementation {}\n{}'.format(round(t4 - t3, 2), sep))
Run Code Online (Sandbox Code Playgroud)

在输出中,您可以看到 1 个未并行化运行的进度条,以及并行化运行时的每核进度条。有一个轻微的故障,有时其余的核心会立即出现,但即便如此,我认为它很有用,因为您可以获得每个核心的进度统计信息(例如,它/秒和总记录)

在此处输入图片说明

感谢@abcdaa 为这个伟大的图书馆!

  • 谢谢@mork - 随意添加到 https://github.com/tqdm/tqdm/wiki/How-to-make-a-great-Progress-Bar 或在 https://github.com/ 创建一个新页面tqdm/tqdm/维基 (2认同)

Nav*_*ala 11

这里的每个答案都使用了pandas.DataFrame.groupby. 如果你想在没有 groupby 的情况下打开进度条pandas.Series.apply,可以在 jupyter-notebook 中执行以下操作:

from tqdm.notebook import tqdm
tqdm.pandas()


df['<applied-col-name>'] = df['<col-name>'].progress_apply(<your-manipulation-function>)
Run Code Online (Sandbox Code Playgroud)


Vic*_*vic 7

如果您需要了解如何在Jupyter / IPython的笔记本使用此支持,像我一样,这里是一个有益的指导和源相关文章

from tqdm._tqdm_notebook import tqdm_notebook
import pandas as pd
tqdm_notebook.pandas()
df = pd.DataFrame(np.random.randint(0, int(1e8), (10000, 1000)))
df.groupby(0).progress_apply(lambda x: x**2)
Run Code Online (Sandbox Code Playgroud)

请注意的import语句中的下划线_tqdm_notebook。正如所引用的文章所提到的,开发处于beta后期。


Jef*_*eff 5

您可以使用装饰器轻松完成此操作

from functools import wraps 

def logging_decorator(func):

    @wraps
    def wrapper(*args, **kwargs):
        wrapper.count += 1
        print "The function I modify has been called {0} times(s).".format(
              wrapper.count)
        func(*args, **kwargs)
    wrapper.count = 0
    return wrapper

modified_function = logging_decorator(feature_rollup)
Run Code Online (Sandbox Code Playgroud)

然后只需使用 modified_function (并在您想要打印时更改)