我正在尝试使用 dask 进行一些令人尴尬的并行处理。出于某种原因,我必须使用 dask,但使用multiprocessing.Pool(5).map.
例如:
import dask
from dask import compute, delayed
def do_something(x): return x * x
data = range(10)
delayed_values = [delayed(do_something)(x) for x in data]
results = compute(*delayed_values, scheduler='processes')
Run Code Online (Sandbox Code Playgroud)
它有效,但显然它只使用一个过程。
如何配置 dask 以便它使用 5 个进程池进行此计算?
您可以使用该num_workers参数来指定该compute方法的进程数。
results = compute(*delayed_values, scheduler='processes', num_workers=5)
Run Code Online (Sandbox Code Playgroud)
您可以将其配置为使用自定义进程池,如下所示:
import dask
from multiprocessing.pool import Pool
dask.config.set(pool=Pool(5))
Run Code Online (Sandbox Code Playgroud)
或作为上下文管理器:
with dask.config.set(scheduler='processes', num_workers=5):
...
Run Code Online (Sandbox Code Playgroud)
您可能想阅读此dask_scheduling
或者我之前的回答