python dask DataFrame,支持(平凡可并行化)行适用?

jf3*_*328 36 python parallel-processing pandas dask

我最近发现dask模块旨在成为一个易于使用的python并行处理模块.对我来说最大的卖点是它与熊猫一起使用.

在其手册页上阅读了一下之后,我找不到办法完成这个简单的可并行化任务:

ts.apply(func) # for pandas series
df.apply(func, axis = 1) # for pandas DF row apply
Run Code Online (Sandbox Code Playgroud)

目前,要在dask,AFAIK实现这一目标,

ddf.assign(A=lambda df: df.apply(func, axis=1)).compute() # dask DataFrame
Run Code Online (Sandbox Code Playgroud)

这是一种丑陋的语法,实际上比直接慢

df.apply(func, axis = 1) # for pandas DF row apply
Run Code Online (Sandbox Code Playgroud)

有什么建议吗?

编辑:感谢@MRocklin的地图功能.它似乎比普通的大熊猫慢.这是与熊猫GIL发布问题有关还是我做错了?

import dask.dataframe as dd
s = pd.Series([10000]*120)
ds = dd.from_pandas(s, npartitions = 3)

def slow_func(k):
    A = np.random.normal(size = k) # k = 10000
    s = 0
    for a in A:
        if a > 0:
            s += 1
        else:
            s -= 1
    return s

s.apply(slow_func) # 0.43 sec
ds.map(slow_func).compute() # 2.04 sec
Run Code Online (Sandbox Code Playgroud)

MRo*_*lin 58

map_partitions

您可以使用该函数将函数应用于数据框的所有分区map_partitions.

df.map_partitions(func, columns=...)
Run Code Online (Sandbox Code Playgroud)

请注意,func一次只会给出数据集的一部分,而不是像整个数据集一样pandas apply(如果你想做并行性,你可能不想要这个数据集.)

map/apply

您可以在一个系列中逐行映射函数 map

df.mycolumn.map(func)
Run Code Online (Sandbox Code Playgroud)

您可以在数据框中逐行映射函数 apply

df.apply(func, axis=1)
Run Code Online (Sandbox Code Playgroud)

线程与进程

从版本0.6.0开始dask.dataframes与线程并行化.自定义Python函数不会从基于线程的并行性中获得太多好处.你可以试试过程

df = dd.read_csv(...)

df.map_partitions(func, columns=...).compute(scheduler='processes')
Run Code Online (Sandbox Code Playgroud)

但要避免 apply

但是,您应该apply在Pandas和Dask中使用自定义Python函数.这通常是表现不佳的原因.如果你找到一种以矢量化方式进行操作的方法,那么你的Pandas代码可能会快100倍,你根本就不需要dask.dataframe.

考虑 numba

对于您可能考虑的特定问题numba.这显着提高了您的表现.

In [1]: import numpy as np
In [2]: import pandas as pd
In [3]: s = pd.Series([10000]*120)

In [4]: %paste
def slow_func(k):
    A = np.random.normal(size = k) # k = 10000
    s = 0
    for a in A:
        if a > 0:
            s += 1
        else:
            s -= 1
    return s
## -- End pasted text --

In [5]: %time _ = s.apply(slow_func)
CPU times: user 345 ms, sys: 3.28 ms, total: 348 ms
Wall time: 347 ms

In [6]: import numba
In [7]: fast_func = numba.jit(slow_func)

In [8]: %time _ = s.apply(fast_func)  # First time incurs compilation overhead
CPU times: user 179 ms, sys: 0 ns, total: 179 ms
Wall time: 175 ms

In [9]: %time _ = s.apply(fast_func)  # Subsequent times are all gain
CPU times: user 68.8 ms, sys: 27 µs, total: 68.8 ms
Wall time: 68.7 ms
Run Code Online (Sandbox Code Playgroud)

免责声明,我的公司,使双方的工作numba,并dask与员工的许多pandas开发人员.

  • @BobHaffner不知道。建议做一个小实验,并发布一个“为什么会这样”样式的stackoverflow问题。 (2认同)