从多进程切换到多线程 Dask.DataFrame

zhc*_*zhc 4 python multithreading dataframe pandas dask

我有一个关于使用 dask 并行化我的代码的问题。我有一个熊猫数据框和 8 核 CPU。所以我想逐行应用一些函数。这是示例:

import dask.dataframe as dd
from dask.multiprocessing import get
# o - is pandas DataFrame
o['dist_center_from'] = dd.from_pandas(o, npartitions=8).map_partitions(lambda df: df.apply(lambda x: vincenty((x.fromlatitude, x.fromlongitude), center).km, axis=1)).compute(get=get)
Run Code Online (Sandbox Code Playgroud)

该代码同时运行 8 个 CPU。现在,我有一个问题,每个进程都吃很多内存,就像主进程一样。所以,我想用共享内存多线程运行它。我尝试更改from dask.multiprocessing import getfrom dask.threaded import get. 但它并没有使用我所有的 CPU,而且我认为它在单核上运行。

MRo*_*lin 7

是的,这是线程和进程之间的权衡:

  • 线程:只有在使用非 Python 代码时才能很好地并行化(大多数 Pandas API 用于数字数据而不是 apply)
  • 进程:需要在进程之间复制数据