熊猫多处理适用

yem*_*emu 24 python multiprocessing pandas

我正在尝试将多处理与pandas数据帧一起使用,即将数据帧拆分为8个部分.使用apply(每个部分在不同的过程中处理)对每个部分应用一些功能.

编辑:这是我最终找到的解决方案:

import multiprocessing as mp
import pandas.util.testing as pdt

def process_apply(x):
    # do some stuff to data here

def process(df):
    res = df.apply(process_apply, axis=1)
    return res

if __name__ == '__main__':
    p = mp.Pool(processes=8)
    split_dfs = np.array_split(big_df,8)
    pool_results = p.map(aoi_proc, split_dfs)
    p.close()
    p.join()

    # merging parts processed by different processes
    parts = pd.concat(pool_results, axis=0)

    # merging newly calculated parts to big_df
    big_df = pd.concat([big_df, parts], axis=1)

    # checking if the dfs were merged correctly
    pdt.assert_series_equal(parts['id'], big_df['id'])
Run Code Online (Sandbox Code Playgroud)

小智 16

基于作者解决方案的更通用的版本,允许在每个函数和数据帧上运行它:

from multiprocessing import  Pool
from functools import partial
import numpy as np

def parallelize(data, func, num_of_processes=8):
    data_split = np.array_split(data, num_of_processes)
    pool = Pool(num_of_processes)
    data = pd.concat(pool.map(func, data_split))
    pool.close()
    pool.join()
    return data

def run_on_subset(func, data_subset):
    return data_subset.apply(func, axis=1)

def parallelize_on_rows(data, func, num_of_processes=8):
    return parallelize(data, partial(run_on_subset, func), num_of_processes)
Run Code Online (Sandbox Code Playgroud)

所以下面这行:

df.apply(some_func, axis=1)
Run Code Online (Sandbox Code Playgroud)

会变成:

parallelize_on_rows(df, some_func) 
Run Code Online (Sandbox Code Playgroud)

  • @TomRaz 在这种情况下我该如何使用部分,而通常我会做这样的事情?`dataframe.apply(lambda row: process(row.attr1, row.attr2, ...))` (3认同)
  • 带参数的“some_func”怎么样? (2认同)

Rob*_*zel 9

这是我发现有用的一些代码。自动将数据帧分割成您拥有的任意多个 CPU 核心。

import pandas as pd
import numpy as np
import multiprocessing as mp

def parallelize_dataframe(df, func):
    num_processes = mp.cpu_count()
    df_split = np.array_split(df, num_processes)
    with mp.Pool(num_processes) as p:
        df = pd.concat(p.map(func, df_split))
    return df

def parallelize_function(df):
    df[column_output] = df[column_input].apply(example_function)
    return df

def example_function(x):
    x = x*2
    return x
Run Code Online (Sandbox Code Playgroud)

跑步:

df_output = parallelize_dataframe(df, parallelize_function)
Run Code Online (Sandbox Code Playgroud)


小智 7

You can use https://github.com/nalepae/pandarallel, as in the following example:

from pandarallel import pandarallel
from math import sin

pandarallel.initialize()

def func(x):
    return sin(x**2)

df.parallel_apply(func, axis=1)

Run Code Online (Sandbox Code Playgroud)

  • 该解决方案原生适用于 Linux 和 macOS。在 Windows 上,Pandaral·lel 仅当从 [Windows Subsystem for Linux (WSL)](https://docs.microsoft.com/en-us/windows/wsl/install-win10) 执行 Python 会话时才起作用。 (12认同)
  • 这个答案应该得到更多的支持。速度提升真是太棒了 (11认同)
  • 在 Windows 上,我收到此错误:`ValueError: 找不到 'fork' 的上下文` (2认同)

Eli*_*adL 5

这对我来说效果很好:

rows_iter = (row for _, row in df.iterrows())

with multiprocessing.Pool() as pool:
    df['new_column'] = pool.map(process_apply, rows_iter)
Run Code Online (Sandbox Code Playgroud)


Raf*_*ros 4

由于我没有太多的数据脚本,这是一个猜测,但我建议使用p.map而不是apply_async回调。

p = mp.Pool(8)
pool_results = p.map(process, np.array_split(big_df,8))
p.close()
p.join()
results = []
for result in pool_results:
    results.extend(result)
Run Code Online (Sandbox Code Playgroud)