我在并行化 groupby 上发现了这个问题。但是,它不能一对一地转换为有多个参数的情况 - 除非我弄错了。
以下是正确的做法吗?有没有更好的办法?(尤其是获取索引似乎效率很低)。
def applyParallel(dfGrouped, func, *args):
with Pool(cpu_count() - 2) as p:
ret_list = p.starmap(func, zip([group for name, group in dfGrouped], repeat(*args)))
index = [name for name, group in dfGrouped]
return pd.Series(index=index, data=ret_list)
Run Code Online (Sandbox Code Playgroud)
哪一个会调用 using applyParallel(df.groupby(foo), someFunc, someArgs).
首先需要注意的是,除非您的数据相当大,否则您可能看不到并行化的太多(或任何)好处。
现在最简单的方法是尝试,而不是直接使用多处理池,dask它提供了类似 pandas 的 api,主要为您管理并行性。
df = pd.DataFrame(np.random.randn(10000000, 10), columns=list('qwertyuiop'))
df['key'] = np.random.randint(0, 100, size=len(df))
import dask.dataframe as dd
# want a partition size small enough to easily fit into memory
# but large enough to make the overhead worth it
ddf = dd.from_pandas(df, npartitions=4)
%timeit df.groupby('key').sum()
1 loop, best of 3: 1.05 s per loop
# calculated in parallel on the 4 partitions
%timeit ddf.groupby('key').sum().compute()
1 loop, best of 3: 695 ms per loop
Run Code Online (Sandbox Code Playgroud)
请注意,默认情况下,dask 对数据帧使用基于线程的调度程序,这对于sum释放 GIL 等函数来说速度更快。如果您正在应用自定义 python 函数(这将需要 GIL),您可能会看到多处理计划的更好性能。
dask.set_options(get=dask.multiprocessing.get)
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
3425 次 |
| 最近记录: |