并行化在pandas groupby之后应用

Iva*_*van 47 python parallel-processing rosetta pandas

我已经使用rosetta.parallel.pandas_easy并行分组后应用,例如:

from rosetta.parallel.pandas_easy import groupby_to_series_to_frame
df = pd.DataFrame({'a': [6, 2, 2], 'b': [4, 5, 6]},index= ['g1', 'g1', 'g2'])
groupby_to_series_to_frame(df, np.mean, n_jobs=8, use_apply=True, by=df.index)
Run Code Online (Sandbox Code Playgroud)

但是,有没有人想出如何并行化返回数据帧的函数?正如预期的那样,此代码对于rosetta失败.

def tmpFunc(df):
    df['c'] = df.a + df.b
    return df

df.groupby(df.index).apply(tmpFunc)
groupby_to_series_to_frame(df, tmpFunc, n_jobs=1, use_apply=True, by=df.index)
Run Code Online (Sandbox Code Playgroud)

Iva*_*van 91

这似乎有效,虽然它真的应该被内置到大熊猫中

import pandas as pd
from joblib import Parallel, delayed
import multiprocessing

def tmpFunc(df):
    df['c'] = df.a + df.b
    return df

def applyParallel(dfGrouped, func):
    retLst = Parallel(n_jobs=multiprocessing.cpu_count())(delayed(func)(group) for name, group in dfGrouped)
    return pd.concat(retLst)

if __name__ == '__main__':
    df = pd.DataFrame({'a': [6, 2, 2], 'b': [4, 5, 6]},index= ['g1', 'g1', 'g2'])
    print 'parallel version: '
    print applyParallel(df.groupby(df.index), tmpFunc)

    print 'regular version: '
    print df.groupby(df.index).apply(tmpFunc)

    print 'ideal version (does not work): '
    print df.groupby(df.index).applyParallel(tmpFunc)
Run Code Online (Sandbox Code Playgroud)

  • 请小心,这最终可能会比单核版本慢!如果您向每个作业发送大量数据但只有很短的计算,则不值得花费这些开销,而且最终会变慢。 (4认同)
  • 你知道在将并行化纳入 Pandas 方面是否有任何进展? (3认同)
  • 通过对函数进行小的修改,可以返回常规应用返回的层次索引:`def temp_func(func,name,group):return func(group),name def applyParallel(dfGrouped,func):retLst, top_index = zip(*Parallel(n_jobs = multiprocessing.cpu_count())(名称的延迟(temp_func)(func,name,group),dfGrouped中的group))返回pd.concat(retLst,keys = top_index)`Dang,I无法弄清楚如何在评论中发布代码... (3认同)
  • 您应该能够通过将 `applyParallel` 绑定到 `df` 来使“理想版本”工作:`from types import MethodType; df.applyParallel = MethodType(applyParallel, df)` (2认同)
  • 我已经尝试过这种方法,但它没有使用所有可用的 cpu,它只使用 1 或 2 个,即使我有 8 个 - 有人遇到过这种情况吗? (2认同)

Pie*_*ton 42

Ivan的答案很棒,但看起来它可以略微简化,也不需要依赖joblib:

from multiprocessing import Pool, cpu_count

def applyParallel(dfGrouped, func):
    with Pool(cpu_count()) as p:
        ret_list = p.map(func, [group for name, group in dfGrouped])
    return pandas.concat(ret_list)
Run Code Online (Sandbox Code Playgroud)

顺便说一句:这不能替换任何 groupby.apply(),但它将涵盖典型案例:例如它应该涵盖文档中的案例2和3 ,而你应该通过给出论证axis=1来获得案例1的行为.最后的pandas.concat()电话.


JD *_*ong 11

我有一个hack用于在Pandas中获得并行化.我将数据帧分成块,将每个块放入列表元素中,然后使用ipython的并行位对数据帧列表进行并行应用.然后我使用pandas concat函数将列表重新组合在一起.

然而,这通常不适用.它适用于我,因为我想要应用于数据帧的每个块的功能大约需要一分钟.将我的数据拆分并整理在一起并不需要那么长时间.所以这显然是一个障碍.话虽如此,这是一个例子.我正在使用Ipython笔记本,所以你会%%time在我的代码中看到魔法:

## make some example data
import pandas as pd

np.random.seed(1)
n=10000
df = pd.DataFrame({'mygroup' : np.random.randint(1000, size=n), 
                   'data' : np.random.rand(n)})
grouped = df.groupby('mygroup')
Run Code Online (Sandbox Code Playgroud)

对于这个例子,我将根据上面的groupby制作"块",但这不一定是数据的分块方式.虽然这是一种非常常见的模式.

dflist = []
for name, group in grouped:
    dflist.append(group)
Run Code Online (Sandbox Code Playgroud)

设置并行位

from IPython.parallel import Client
rc = Client()
lview = rc.load_balanced_view()
lview.block = True
Run Code Online (Sandbox Code Playgroud)

写一个愚蠢的函数来应用我们的数据

def myFunc(inDf):
    inDf['newCol'] = inDf.data ** 10
    return inDf
Run Code Online (Sandbox Code Playgroud)

现在让我们以串行方式然后并行运行代码.先串行:

%%time
serial_list = map(myFunc, dflist)
CPU times: user 14 s, sys: 19.9 ms, total: 14 s
Wall time: 14 s
Run Code Online (Sandbox Code Playgroud)

现在平行

%%time
parallel_list = lview.map(myFunc, dflist)

CPU times: user 1.46 s, sys: 86.9 ms, total: 1.54 s
Wall time: 1.56 s
Run Code Online (Sandbox Code Playgroud)

然后只需几毫秒就可以将它们合并回一个数据帧

%%time
combinedDf = pd.concat(parallel_list)
 CPU times: user 296 ms, sys: 5.27 ms, total: 301 ms
Wall time: 300 ms
Run Code Online (Sandbox Code Playgroud)

我在MacBook上运行了6个IPython引擎,但你可以看到它将执行时间从14秒降低到2秒.

对于真正长时间运行的随机模拟,我可以通过使用StarCluster启动集群来使用AWS后端.但是,在很多时候,我只在我的MBP上的8个CPU上并行化.


spr*_*ing 8

JD Long 回答的简短评论。我发现如果组的数量非常大(比如数十万),并且您的 apply 函数正在做一些相当简单和快速的事情,那么将您的数据帧分解成块并将每个块分配给一个工作人员来执行groupby-apply(串行)可以比并行 groupby-apply 和让工作人员读取包含多个组的队列快得多。例子:

import pandas as pd
import numpy as np
import time
from concurrent.futures import ProcessPoolExecutor, as_completed

nrows = 15000
np.random.seed(1980)
df = pd.DataFrame({'a': np.random.permutation(np.arange(nrows))})
Run Code Online (Sandbox Code Playgroud)

所以我们的数据框看起来像:

    a
0   3425
1   1016
2   8141
3   9263
4   8018
Run Code Online (Sandbox Code Playgroud)

请注意,“a”列有许多组(想想客户 ID):

len(df.a.unique())
15000
Run Code Online (Sandbox Code Playgroud)

对我们的组进行操作的函数:

def f1(group):
    time.sleep(0.0001)
    return group
Run Code Online (Sandbox Code Playgroud)

启动池:

ppe = ProcessPoolExecutor(12)
futures = []
results = []
Run Code Online (Sandbox Code Playgroud)

做一个并行的 groupby-apply:

%%time

for name, group in df.groupby('a'):
    p = ppe.submit(f1, group)
    futures.append(p)

for future in as_completed(futures):
    r = future.result()
    results.append(r)

df_output = pd.concat(results)
del ppe

CPU times: user 18.8 s, sys: 2.15 s, total: 21 s
Wall time: 17.9 s
Run Code Online (Sandbox Code Playgroud)

现在让我们添加一列,将 df 划分为更少的组:

df['b'] = np.random.randint(0, 12, nrows)
Run Code Online (Sandbox Code Playgroud)

现在只有 12 个组而不是 15000 个组:

len(df.b.unique())
12
Run Code Online (Sandbox Code Playgroud)

我们将对我们的 df 进行分区并对每个块进行 groupby-apply。

ppe = ProcessPoolExecutor(12)
Run Code Online (Sandbox Code Playgroud)

包装乐趣:

def f2(df):
    df.groupby('a').apply(f1)
    return df
Run Code Online (Sandbox Code Playgroud)

串行发送每个要操作的chunk:

%%time

for i in df.b.unique():
    p = ppe.submit(f2, df[df.b==i])
    futures.append(p)

for future in as_completed(futures):
    r = future.result()
    results.append(r)

df_output = pd.concat(results) 

CPU times: user 11.4 s, sys: 176 ms, total: 11.5 s
Wall time: 12.4 s
Run Code Online (Sandbox Code Playgroud)

请注意,每组花费的时间没有改变。相反,改变的是工作人员从中读取的队列长度。我怀疑正在发生的事情是工作人员无法同时访问共享内存,并且不断返回以读取队列,因此相互踩踏。使用更大的块进行操作,工作人员返回的频率较低,因此这个问题得到改善,整体执行速度更快。


Jas*_*ter 6

免责声明:我是所有者和主要贡献者/维护者swifter

swifter是我在 4 年前创建的一个 python 包,它可以以最快的可用方式将任何函数有效地应用于 pandas 数据框或系列。截至今天,swifter拥有超过 2k GitHub star、每月 25 万次下载和 95% 的代码覆盖率。

从 v1.3.2 开始,swifter为高性能并行化 groupby 应用提供了一个简单的接口:

df.swifter.groupby(df.index).apply(tmpFunc)
Run Code Online (Sandbox Code Playgroud)

我还创建了性能基准来展示 swifter 的性能改进,并在此处复制了关键视觉效果: Swifter Groupby 应用性能基准

您可以通过 pip 轻松安装 swifter(具有 groupby 应用功能):

pip install swifter[groupby]>=1.3.2
Run Code Online (Sandbox Code Playgroud)

或通过康达:

conda install -c conda-forge swifter>=1.3.2 ray>=1.0.0
Run Code Online (Sandbox Code Playgroud)

请查看自述文件文档以获取更多信息


Ali*_*eza 5

人们正在转向使用 bodo 来实现并行性。它是可用于并行化 Python 的最快引擎,因为它使用 MPI 编译代码。它的新编译器使其比 Dask、Ray、多处理、pandarel 等快得多。在这篇博文中阅读 bodo 与 Dask 的比较,并查看 Travis 在他的 LinkedIn 中对 bodo 的评价!他是 Anaconda 的创始人:引用“bodo is the real deal”

https://bodo.ai/blog/performance-and-cost-of-bodo-vs-spark-dask-ray

https://www.linkedin.com/posts/teoliphant_performance-and-cost-evaluation-of-bodo-vs-activity-6873290539773632512-y5iZ/

根据如何将 groupby 与 bodo 一起使用,我在这里编写了一个示例代码:

#install bodo through your terminal

conda create -n Bodo python=3.9 -c conda-forge
conda activate Bodo
conda install bodo -c bodo.ai -c conda-forge
Run Code Online (Sandbox Code Playgroud)

这是 groupby 的代码示例:

import time
import pandas as pd
import bodo


@bodo.jit
def read_data():
""" a dataframe with 2 columns, headers: 'A', 'B' 
or you can just create a data frame instead of reading it from flat file 
"""
    return pd.read_parquet("your_input_data.pq")


@bodo.jit
def data_groupby(input_df):
    t_1 = time.time()
    df2 = input_df.groupby("A", as_index=False).sum()
    t_2 = time.time()
    print("Compute time: {:.2f}".format(t_2-t_1))
    return df2, t_2-t_1


if __name__ == "__main__":
    df = read_data()
    t0 = time.time()
    output, compute_time = data_groupby(df)
    t2 = time.time()
    total_time = t2 - t0
    if bodo.get_rank() == 0:
        print("Compilation time: {:.2f}".format(total_time - compute_time))
        print("Total time second call: {:.2f}".format(total_time))
Run Code Online (Sandbox Code Playgroud)

最后通过终端使用 mpiexec 运行它。-n 确定要运行它的核心 (CPU) 数量。

mpiexec -n 4 python filename.py
Run Code Online (Sandbox Code Playgroud)