jf3*_*328 36 python parallel-processing pandas dask
我最近发现dask模块旨在成为一个易于使用的python并行处理模块.对我来说最大的卖点是它与熊猫一起使用.
在其手册页上阅读了一下之后,我找不到办法完成这个简单的可并行化任务:
ts.apply(func) # for pandas series
df.apply(func, axis = 1) # for pandas DF row apply
Run Code Online (Sandbox Code Playgroud)
目前,要在dask,AFAIK实现这一目标,
ddf.assign(A=lambda df: df.apply(func, axis=1)).compute() # dask DataFrame
Run Code Online (Sandbox Code Playgroud)
这是一种丑陋的语法,实际上比直接慢
df.apply(func, axis = 1) # for pandas DF row apply
Run Code Online (Sandbox Code Playgroud)
有什么建议吗?
编辑:感谢@MRocklin的地图功能.它似乎比普通的大熊猫慢.这是与熊猫GIL发布问题有关还是我做错了?
import dask.dataframe as dd
s = pd.Series([10000]*120)
ds = dd.from_pandas(s, npartitions = 3)
def slow_func(k):
A = np.random.normal(size = k) # k = 10000
s = 0
for a in A:
if a > 0:
s += 1
else:
s -= 1
return s
s.apply(slow_func) # 0.43 sec
ds.map(slow_func).compute() # 2.04 sec
Run Code Online (Sandbox Code Playgroud)
MRo*_*lin 58
map_partitions您可以使用该函数将函数应用于数据框的所有分区map_partitions.
df.map_partitions(func, columns=...)
Run Code Online (Sandbox Code Playgroud)
请注意,func一次只会给出数据集的一部分,而不是像整个数据集一样pandas apply(如果你想做并行性,你可能不想要这个数据集.)
map/apply您可以在一个系列中逐行映射函数 map
df.mycolumn.map(func)
Run Code Online (Sandbox Code Playgroud)
您可以在数据框中逐行映射函数 apply
df.apply(func, axis=1)
Run Code Online (Sandbox Code Playgroud)
从版本0.6.0开始dask.dataframes与线程并行化.自定义Python函数不会从基于线程的并行性中获得太多好处.你可以试试过程
df = dd.read_csv(...)
df.map_partitions(func, columns=...).compute(scheduler='processes')
Run Code Online (Sandbox Code Playgroud)
apply但是,您应该apply在Pandas和Dask中使用自定义Python函数.这通常是表现不佳的原因.如果你找到一种以矢量化方式进行操作的方法,那么你的Pandas代码可能会快100倍,你根本就不需要dask.dataframe.
numba对于您可能考虑的特定问题numba.这显着提高了您的表现.
In [1]: import numpy as np
In [2]: import pandas as pd
In [3]: s = pd.Series([10000]*120)
In [4]: %paste
def slow_func(k):
A = np.random.normal(size = k) # k = 10000
s = 0
for a in A:
if a > 0:
s += 1
else:
s -= 1
return s
## -- End pasted text --
In [5]: %time _ = s.apply(slow_func)
CPU times: user 345 ms, sys: 3.28 ms, total: 348 ms
Wall time: 347 ms
In [6]: import numba
In [7]: fast_func = numba.jit(slow_func)
In [8]: %time _ = s.apply(fast_func) # First time incurs compilation overhead
CPU times: user 179 ms, sys: 0 ns, total: 179 ms
Wall time: 175 ms
In [9]: %time _ = s.apply(fast_func) # Subsequent times are all gain
CPU times: user 68.8 ms, sys: 27 µs, total: 68.8 ms
Wall time: 68.7 ms
Run Code Online (Sandbox Code Playgroud)
免责声明,我的公司,使双方的工作numba,并dask与员工的许多pandas开发人员.