Pandas和多处理内存管理:将DataFrame拆分为多个块

SmC*_*lar 6 python memory multiprocessing pandas python-3.5

我必须pandas.DataFrame逐行处理一个巨大的(几十GB),每行操作都很长(几十毫秒).所以我有了将框架拆分成块并使用并行处理每个块的想法multiprocessing.这确实加快了任务,但内存消耗是一场噩梦.

虽然每个子进程原则上只占用一小部分数据,但它需要(几乎)与包含原始进程的原始父进程一样多的内存DataFrame.即使删除父进程中使用过的部分也无济于事.

我写了一个复制这种行为的最小例子.它唯一能做的就是创建一个DataFrame带有随机数的大块,将它分成最多100行的小块,然后简单地打印一些关于DataFrame多处理过程中的信息(这里通过mp.Pool4号大小).

并行执行的主要功能:

def just_wait_and_print_len_and_idx(df):
    """Waits for 5 seconds and prints df length and first and last index"""
    # Extract some info
    idx_values = df.index.values
    first_idx, last_idx = idx_values[0], idx_values[-1]
    length = len(df)
    pid = os.getpid()

    # Waste some CPU cycles
    time.sleep(1)

    # Print the info
    print('First idx {}, last idx {} and len {} '
          'from process {}'.format(first_idx, last_idx, length, pid))
Run Code Online (Sandbox Code Playgroud)

辅助生成器将一个块DataFrame分成小块:

def df_chunking(df, chunksize):
    """Splits df into chunks, drops data of original df inplace"""
    count = 0 # Counter for chunks
    while len(df):
        count += 1
        print('Preparing chunk {}'.format(count))
        # Return df chunk
        yield df.iloc[:chunksize].copy()
        # Delete data in place because it is no longer needed
        df.drop(df.index[:chunksize], inplace=True)
Run Code Online (Sandbox Code Playgroud)

主要例程:

def main():
    # Job parameters
    n_jobs = 4  # Poolsize
    size = (10000, 1000)  # Size of DataFrame
    chunksize = 100  # Maximum size of Frame Chunk

    # Preparation
    df = pd.DataFrame(np.random.rand(*size))
    pool = mp.Pool(n_jobs)

    print('Starting MP')

    # Execute the wait and print function in parallel
    pool.imap(just_wait_and_print_len_and_idx, df_chunking(df, chunksize))

    pool.close()
    pool.join()

    print('DONE')
Run Code Online (Sandbox Code Playgroud)

标准输出如下所示:

Starting MP
Preparing chunk 1
Preparing chunk 2
First idx 0, last idx 99 and len 100 from process 9913
First idx 100, last idx 199 and len 100 from process 9914
Preparing chunk 3
First idx 200, last idx 299 and len 100 from process 9915
Preparing chunk 4
...
DONE
Run Code Online (Sandbox Code Playgroud)

问题:

主进程需要大约120MB的内存.但是,池的子进程需要相同的内存量,尽管它们只包含原始内容的1%DataFame(大小为100的块与原始长度为10000).为什么?

我能做些什么呢?DataFrame尽管我的分块,Python(3)是否将整体发送到每个子进程?这是pandas内存管理的问题还是multiprocessing数据酸洗的错误?谢谢!



用于简单复制和粘贴的整个脚本,以防您想自己尝试:

import multiprocessing as mp
import pandas as pd
import numpy as np
import time
import os


def just_wait_and_print_len_and_idx(df):
    """Waits for 5 seconds and prints df length and first and last index"""
    # Extract some info
    idx_values = df.index.values
    first_idx, last_idx = idx_values[0], idx_values[-1]
    length = len(df)
    pid = os.getpid()

    # Waste some CPU cycles
    time.sleep(1)

    # Print the info
    print('First idx {}, last idx {} and len {} '
          'from process {}'.format(first_idx, last_idx, length, pid))


def df_chunking(df, chunksize):
    """Splits df into chunks, drops data of original df inplace"""
    count = 0 # Counter for chunks
    while len(df):
        count += 1
        print('Preparing chunk {}'.format(count))
        # Return df chunk
        yield df.iloc[:chunksize].copy()
        # Delete data in place because it is no longer needed
        df.drop(df.index[:chunksize], inplace=True)


def main():
    # Job parameters
    n_jobs = 4  # Poolsize
    size = (10000, 1000)  # Size of DataFrame
    chunksize = 100  # Maximum size of Frame Chunk

    # Preparation
    df = pd.DataFrame(np.random.rand(*size))
    pool = mp.Pool(n_jobs)

    print('Starting MP')

    # Execute the wait and print function in parallel
    pool.imap(just_wait_and_print_len_and_idx, df_chunking(df, chunksize))

    pool.close()
    pool.join()

    print('DONE')


if __name__ == '__main__':
    main()
Run Code Online (Sandbox Code Playgroud)

SmC*_*lar 5

好的,所以我在 Sebastian Opa?czy?ski 在评论中的提示之后想通了。

问题是子进程是从父进程派生出来的,所以它们都包含对原始DataFrame. 但是,帧是在原始进程中操作的,因此写时复制行为会在达到物理内存限制时缓慢并最终杀死整个事物。

有一个简单的解决方案:代替pool = mp.Pool(n_jobs),我使用了新的上下文功能multiprocessing

ctx = mp.get_context('spawn')
pool = ctx.Pool(n_jobs)
Run Code Online (Sandbox Code Playgroud)

这保证了Pool进程只是衍生出来的,而不是从父进程派生出来的。因此,它们中没有一个可以访问原始文件,DataFrame并且所有这些只需要父级内存的一小部分。

请注意,mp.get_context('spawn')仅在 Python 3.4 及更新版本中可用。