如何在一台机器上使用所有内核的Pandas Dataframes并行化apply()?

Rok*_*jic 68 pandas dask

截至2017年8月,不幸的是,Pandas DataFame.apply()仅限于使用单核,这意味着多核机器在运行时将浪费大部分计算时间df.apply(myfunc, axis=1).

如何使用所有核心并行运行应用于数据帧?

Rok*_*jic 80

最简单的方法是使用Dask的map_partitions.你需要这些导入(你需要pip install dask):

import pandas as pd
import dask.dataframe as dd
from dask.multiprocessing import get
Run Code Online (Sandbox Code Playgroud)

而语法是

data = <your_pandas_dataframe>
ddata = dd.from_pandas(data, npartitions=30)

def myfunc(x,y,z, ...): return <whatever>

res = ddata.map_partitions(lambda df: df.apply((lambda row: myfunc(*row)), axis=1)).compute(get=get)  
Run Code Online (Sandbox Code Playgroud)

(如果您有16个核心,我相信30是一个合适的分区数量).为了完整起见,我在机器上计算了差异(16个核心):

data = pd.DataFrame()
data['col1'] = np.random.normal(size = 1500000)
data['col2'] = np.random.normal(size = 1500000)

ddata = dd.from_pandas(data, npartitions=30)
def myfunc(x,y): return y*(x**2+1)
def apply_myfunc_to_DF(df): return df.apply((lambda row: myfunc(*row)), axis=1)
def pandas_apply(): return apply_myfunc_to_DF(data)
def dask_apply(): return ddata.map_partitions(apply_myfunc_to_DF).compute(get=get)  
def vectorized(): return myfunc(data['col1'], data['col2']  )

t_pds = timeit.Timer(lambda: pandas_apply())
print(t_pds.timeit(number=1))
Run Code Online (Sandbox Code Playgroud)

28.16970546543598

t_dsk = timeit.Timer(lambda: dask_apply())
print(t_dsk.timeit(number=1))
Run Code Online (Sandbox Code Playgroud)

2.708152851089835

t_vec = timeit.Timer(lambda: vectorized())
print(t_vec.timeit(number=1))
Run Code Online (Sandbox Code Playgroud)

0.010668013244867325

给予10加速的因素,从熊猫去申请DASK适用于分区.当然,如果你有一个可以矢量化的函数,你应该 - 在这种情况下,函数(y*(x**2+1))很容易被矢量化,但是有很多东西是不可能矢量化的.

  • 唯一的问题是`get =关键字已被弃用.请使用scheduler =关键字,而不是所需的调度程序的名称,如'threads'或'processes' (7认同)
  • 很高兴知道,感谢发布.你能解释为什么选择30个分区吗?更改此值时性能是否会发生变化? (2认同)
  • @AndrewL我假设每个分区都由一个单独的进程提供服务,并且有16个核心我假设16或32个进程可以同时运行.我试了一下,性能似乎提高了32个分区,但进一步增加没有任何有益效果.我假设使用四核机器你会想要8个分区等.注意我确实注意到16和32之间的一些改进,所以我认为你真的想要2x $ NUM_PROCESSORS (2认同)
  • 对于dask v0.20.0及更高版本,请使用ddata.map_partitions(lambda df:df.apply((lambda row:myfunc(* row)),axis = 1))。compute(scheduler ='processes')或以下之一其他调度程序选项。当前代码抛出“ TypeError:get =关键字已被删除。请使用scheduler =关键字,而不是所需的调度程序的名称,例如'threads'或'processes'。” (2认同)
  • 在执行此操作之前,请确保数据框没有重复的索引,因为它会抛出“ValueError:无法从重复轴重新索引”。要解决这个问题,要么你应该通过`df = df[~df.index.duplicated()]`删除重复的索引,或者通过`df.reset_index(inplace=True)`重置你的索引。 (2认同)

slh*_*hck 45

你可以使用这个swifter包:

pip install swifter
Run Code Online (Sandbox Code Playgroud)

它作为pandas的插件,允许您重用该apply功能:

import swifter

def some_function(data):
    return data * 10

data['out'] = data['in'].swifter.apply(some_function)
Run Code Online (Sandbox Code Playgroud)

它将自动找出并行化函数的最有效方法,无论它是否为矢量化(如上例所示).

GitHub上提供了更多示例性能比较.请注意,该软件包正处于活动开发阶段,因此API可能会发生变化.

  • 对于字符串,只需添加“allow_dask_on_strings(enable=True)”,如下所示:“df.swifter.allow_dask_on_strings(enable=True).apply(some_function)”来源:https://github.com/jmcarpenter2/swifter/issues/45 (3认同)
  • 我们纯粹出于好奇,在执行并行应用时,有没有办法限制它使用的内核数量?我有一个共享服务器,所以如果我获取所有 32 个内核,没有人会高兴。 (2认同)
  • 斯威特+1。它不仅使用最佳可用方法进行并行化,还通过 tqdm 集成进度条。 (2认同)

Oli*_*ant 24

如果你想留在原生 python 中:

import multiprocessing as mp

with mp.Pool(mp.cpu_count()) as pool:
    df['newcol'] = pool.map(f, df['col'])
Run Code Online (Sandbox Code Playgroud)

将以f并行方式将函数应用于col数据框列df

  • 您可以将 pool.map 写入中间 temp_result 列表,以允许检查长度是否与 df 匹配,然后执行 df['newcol'] = temp_result? (2认同)

LYu*_*LYu 15

只是想给出Dask的更新答案

import dask.dataframe as dd

def your_func(row):
  #do something
  return row

ddf = dd.from_pandas(df, npartitions=30) # find your own number of partitions
ddf_update = ddf.apply(your_func, axis=1).compute()
Run Code Online (Sandbox Code Playgroud)

在我的 100,000 条记录中,没有 Dask:

CPU 时间:用户 6 分钟 32 秒,系统:100 毫秒,总计:6 分钟 32 秒 墙上时间:6 分钟 32 秒

与达斯克:

CPU 时间:用户 5.19 秒,系统:784 毫秒,总计:5.98 秒 墙上时间:1 分钟 3 秒


G_K*_*IEF 12

您可以pandarallel改用:一种简单高效的工具,可在所有CPU上并行化熊猫操作(在Linux和macOS上)

  • 并行化具有成本(初始化新进程,通过共享内存发送数据等),因此,只有在要进行并行化的计算量足够高的情况下,并行化才有效。对于很少的数据,使用并行化并不总是值得的。
  • 应用的函数不应是lambda函数。
from pandarallel import pandarallel
from math import sin

pandarallel.initialize()

# FORBIDDEN
df.parallel_apply(lambda x: sin(x**2), axis=1)

# ALLOWED
def func(x):
    return sin(x**2)

df.parallel_apply(func, axis=1)
Run Code Online (Sandbox Code Playgroud)

参见https://github.com/nalepae/pandarallel

  • @G_KOBELIEF 使用 Python &gt;3.6 我们可以将 lambda 函数与 pandaparallel 一起使用 (2认同)

Dav*_* L. 5

要使用所有(物理或逻辑)内核,您可以尝试mapply替代swifterpandarallel

您可以在初始化时设置内核数量(和分块行为):

import pandas as pd
import mapply

mapply.init(n_workers=-1)

...

df.mapply(myfunc, axis=1)
Run Code Online (Sandbox Code Playgroud)

默认情况下 ( n_workers=-1),程序包使用系统上可用的所有物理 CPU。如果您的系统使用超线程(通常会显示物理 CPU 数量的两倍),mapply则会产生一个额外的工作线程,以将多处理池优先于系统上的其他进程。

根据您对 的定义all your cores,您还可以改用所有逻辑内核(请注意,像这样受 CPU 限制的进程将争夺物理 CPU,这可能会减慢您的操作速度):

import multiprocessing
n_workers = multiprocessing.cpu_count()

# or more explicit
import psutil
n_workers = psutil.cpu_count(logical=True)
Run Code Online (Sandbox Code Playgroud)