截至2017年8月,不幸的是,Pandas DataFame.apply()仅限于使用单核,这意味着多核机器在运行时将浪费大部分计算时间df.apply(myfunc, axis=1)
.
如何使用所有核心并行运行应用于数据帧?
Rok*_*jic 80
最简单的方法是使用Dask的map_partitions.你需要这些导入(你需要pip install dask
):
import pandas as pd
import dask.dataframe as dd
from dask.multiprocessing import get
Run Code Online (Sandbox Code Playgroud)
而语法是
data = <your_pandas_dataframe>
ddata = dd.from_pandas(data, npartitions=30)
def myfunc(x,y,z, ...): return <whatever>
res = ddata.map_partitions(lambda df: df.apply((lambda row: myfunc(*row)), axis=1)).compute(get=get)
Run Code Online (Sandbox Code Playgroud)
(如果您有16个核心,我相信30是一个合适的分区数量).为了完整起见,我在机器上计算了差异(16个核心):
data = pd.DataFrame()
data['col1'] = np.random.normal(size = 1500000)
data['col2'] = np.random.normal(size = 1500000)
ddata = dd.from_pandas(data, npartitions=30)
def myfunc(x,y): return y*(x**2+1)
def apply_myfunc_to_DF(df): return df.apply((lambda row: myfunc(*row)), axis=1)
def pandas_apply(): return apply_myfunc_to_DF(data)
def dask_apply(): return ddata.map_partitions(apply_myfunc_to_DF).compute(get=get)
def vectorized(): return myfunc(data['col1'], data['col2'] )
t_pds = timeit.Timer(lambda: pandas_apply())
print(t_pds.timeit(number=1))
Run Code Online (Sandbox Code Playgroud)
28.16970546543598
t_dsk = timeit.Timer(lambda: dask_apply())
print(t_dsk.timeit(number=1))
Run Code Online (Sandbox Code Playgroud)
2.708152851089835
t_vec = timeit.Timer(lambda: vectorized())
print(t_vec.timeit(number=1))
Run Code Online (Sandbox Code Playgroud)
0.010668013244867325
给予10加速的因素,从熊猫去申请DASK适用于分区.当然,如果你有一个可以矢量化的函数,你应该 - 在这种情况下,函数(y*(x**2+1)
)很容易被矢量化,但是有很多东西是不可能矢量化的.
slh*_*hck 45
你可以使用这个swifter
包:
pip install swifter
Run Code Online (Sandbox Code Playgroud)
它作为pandas的插件,允许您重用该apply
功能:
import swifter
def some_function(data):
return data * 10
data['out'] = data['in'].swifter.apply(some_function)
Run Code Online (Sandbox Code Playgroud)
它将自动找出并行化函数的最有效方法,无论它是否为矢量化(如上例所示).
GitHub上提供了更多示例和性能比较.请注意,该软件包正处于活动开发阶段,因此API可能会发生变化.
Oli*_*ant 24
如果你想留在原生 python 中:
import multiprocessing as mp
with mp.Pool(mp.cpu_count()) as pool:
df['newcol'] = pool.map(f, df['col'])
Run Code Online (Sandbox Code Playgroud)
将以f
并行方式将函数应用于col
数据框列df
LYu*_*LYu 15
只是想给出Dask的更新答案
import dask.dataframe as dd
def your_func(row):
#do something
return row
ddf = dd.from_pandas(df, npartitions=30) # find your own number of partitions
ddf_update = ddf.apply(your_func, axis=1).compute()
Run Code Online (Sandbox Code Playgroud)
在我的 100,000 条记录中,没有 Dask:
CPU 时间:用户 6 分钟 32 秒,系统:100 毫秒,总计:6 分钟 32 秒 墙上时间:6 分钟 32 秒
与达斯克:
CPU 时间:用户 5.19 秒,系统:784 毫秒,总计:5.98 秒 墙上时间:1 分钟 3 秒
G_K*_*IEF 12
您可以pandarallel
改用:一种简单高效的工具,可在所有CPU上并行化熊猫操作(在Linux和macOS上)
from pandarallel import pandarallel
from math import sin
pandarallel.initialize()
# FORBIDDEN
df.parallel_apply(lambda x: sin(x**2), axis=1)
# ALLOWED
def func(x):
return sin(x**2)
df.parallel_apply(func, axis=1)
Run Code Online (Sandbox Code Playgroud)
参见https://github.com/nalepae/pandarallel
要使用所有(物理或逻辑)内核,您可以尝试mapply
替代swifter
和pandarallel
。
您可以在初始化时设置内核数量(和分块行为):
import pandas as pd
import mapply
mapply.init(n_workers=-1)
...
df.mapply(myfunc, axis=1)
Run Code Online (Sandbox Code Playgroud)
默认情况下 ( n_workers=-1
),程序包使用系统上可用的所有物理 CPU。如果您的系统使用超线程(通常会显示物理 CPU 数量的两倍),mapply
则会产生一个额外的工作线程,以将多处理池优先于系统上的其他进程。
根据您对 的定义all your cores
,您还可以改用所有逻辑内核(请注意,像这样受 CPU 限制的进程将争夺物理 CPU,这可能会减慢您的操作速度):
import multiprocessing
n_workers = multiprocessing.cpu_count()
# or more explicit
import psutil
n_workers = psutil.cpu_count(logical=True)
Run Code Online (Sandbox Code Playgroud)
归档时间: |
|
查看次数: |
29338 次 |
最近记录: |