Pandas 并行 groupBy 消耗大量内存

Man*_*iro 5 python group-by pandas

我有一个中等大小的文件(~300MB),其中包含个人列表(~300k)以及他们执行的操作。我正在尝试使用此处描述groupBy的并行化版本对每个人应用一个操作。它看起来像这样apply

import pandas
import multiprocessing
from joblib import Parallel, delayed

df = pandas.read_csv(src)
patients_table_raw = apply_parallel(df.groupby('ID'), f)

def applyParallel(dfGrouped, func):
    retLst = Parallel(n_jobs=multiprocessing.cpu_count())(delayed(func)(group) for name, group in dfGrouped)
    return pd.concat(retLst)
Run Code Online (Sandbox Code Playgroud)

但不幸的是,这消耗了大量的空间。我认为这与以下简单命令有关:

list_groups = list(df.groupby('ID'))
Run Code Online (Sandbox Code Playgroud)

消耗几GB内存!如何进行?我最初的想法是在小“堆栈”中迭代 groupBy,而不消耗太多内存(但我没有找到一种方法来做到这一点而不将其转换为列表)。

更详细的上下文

我有一个简单的 CSV 数据集,格式如下:

|-------------------------|
| ID | Timestamp | Action |
|-------------------------|
|1   | 0         | A      |
|1   | 10        | B      |
|1   | 20        | C      |
|2   | 0         | B      |
|2   | 15        | C      |
         ...
Run Code Online (Sandbox Code Playgroud)

我基本上想做的是创建一个不同的表,其中包含个人及其 ID 的操作/时间戳序列的描述。这将帮助我找回个人

|------------------|
| ID | Description |
|------------------|
|1   | 0A10B20C    |
|2   | 0B15C       |
         ...
Run Code Online (Sandbox Code Playgroud)

为了做到这一点,并遵循 Pythonic 方式,我的想法基本上是加载 pandas DataFrame 中的第一个表,按 ID 分组,并在分组中应用一个函数,该函数返回我想要为每个组提供的表的一行(每个 ID)。然而,我的数据集中有很多个人(大约 100 万),并且 groupBy 操作非常昂贵(没有显式垃圾收集,正如我在自己的答案中提到的)。另外,并行化 groupBy 意味着大量的内存使用,因为显然有些事情会重复。

因此,更详细的问题是:如何使用 groupBy(从而使数据处理比实现自己的循环更快)并且不会产生巨大的内存开销?

Man*_*iro 2

一些评论,然后是我找到的解决方案:

  • 我已经尝试过dask,但没有太大区别。我猜这是因为文件不够大,无法使用辅助内存。

  • 如果您在应用于组的函数内执行垃圾收集,则内存性能会显着提高。gc.collect()我已经成功地通过每 10000 美元交互发生的简单操作来做到这一点。就像是:

    x['ID'].head(1).values[0] % 10000 == 0:
        gc.collect()
    
    Run Code Online (Sandbox Code Playgroud)
  • 垃圾收集实际上使我的并行版本运行。但这return pd.concat(retLst)又是一个巨大的瓶颈,消耗了大量的内存!

我的最终解决方案是以外部方式并行化解决方案:

  • 我创建了一个函数,它将执行 groupBy 并申请 ID 在 [X,Y] 范围内的个人

  • 我只是创建一个池并并行运行它们。每个进程根据其范围使用不同的名称保存文件

    f = functools.partial(make_patient_tables2, src="in", dest="out")
    range_of = [(0, 10000), (10000, 20000), (20000, 30000)]
    with Pool(cpu_count()) as p:
        ret_list = p.map(f, range_of)
    
    Run Code Online (Sandbox Code Playgroud)
  • 最后但并非最不重要的一点是,我连接了所有生成的文件。

请注意,这仍然有点占用内存,因为我们必须复制表的读取(这是在make_ Patient_tables2内完成的,但无论如何都会发生,因为多处理不共享资源。因此,更好的解决方案将涉及共享资源,但是垃圾收集器+不使用concat+复制原始数据仅2-3次对我来说就足够了!

当然不漂亮。希望它对其他人有帮助。