并行引导,替换为xarray / dask

aar*_*ing 5 multiprocessing resampling dask python-xarray

我想执行N = 1000引导,并替换网格数据。一次计算大约需要0.5s。我可以访问具有48个内核的超级计算机专用节点。因为重采样是相互独立的,所以我天真地希望将工作负载分配到所有或至少多个内核上,并使性能提高0.8 * ncores。但是我不明白。

我仍然缺乏对敏捷的了解。基于设置敏捷工作者数量的最佳实践,我使用:

from dask.distributed import Client
client = Client(processes=False, threads_per_worker=8, n_workers=6, memory_limit=‘32GB')
Run Code Online (Sandbox Code Playgroud)

我也尝试过SLURMCluster,但我想我首先需要了解自己的工作,然后进行扩展。

我的MWE:

  1. 创建样本数据
  2. 我想申请的写功能
  3. 编写重采样init函数
  4. 使用引导程序(= N)作为参数编写引导功能:请参阅下面的许多实现
  5. 执行引导
import dask
import numpy as np
import xarray as xr
from dask.distributed import Client

inits = np.arange(50)
lats = np.arange(96)
lons = np.arange(192)
data = np.random.rand(len(inits), len(lats), len(lons))
a = xr.DataArray(data,
                        coords=[inits, lats, lons],
                        dims=['init', 'lat', 'lon'])

data = np.random.rand(len(inits), len(lats), len(lons))
b = xr.DataArray(data,
                        coords=[inits, lats, lons],
                        dims=['init', 'lat', 'lon'])

def func(a,b, dim='init'):
    return (a-b).std(dim)

bootstrap=96

def resample(a):
    smp_init = np.random.choice(inits, len(inits))
    smp_a = a.sel(init=smp_init)
    smp_a['init'] = inits
    return smp_a


# serial function
def bootstrap_func(bootstrap=bootstrap):
    res = (func(resample(a),b) for _ in range(bootstrap))
    res = xr.concat(res,'bootstrap')
    # leave out quantile because not issue here yet
    #res_ci = res.quantile([.05,.95],'bootstrap')
    return res


@dask.delayed
def bootstrap_func_delayed_decorator(bootstrap=bootstrap):
    return bootstrap_func(bootstrap=bootstrap)


def bootstrap_func_delayed(bootstrap=bootstrap):
    res = (dask.delayed(func)(resample(a),b) for _ in range(bootstrap))
    res = xr.concat(dask.compute(*res),'bootstrap')
    #res_ci = res.quantile([.05,.95],'bootstrap')
    return res

for scheduler in ['synchronous','distributed','multiprocessing','processes','single-threaded','threads']:
    print('scheduler:',scheduler)

    def bootstrap_func_delayed_processes(bootstrap=bootstrap):
        res = (dask.delayed(func)(resample(a),b) for _ in range(bootstrap))
        res = xr.concat(dask.compute(*res, scheduler=scheduler),'bootstrap')
        res = res.quantile([.05,.95],'bootstrap')
        return res

    %time c = bootstrap_func_delayed_processes()
Run Code Online (Sandbox Code Playgroud)

以下结果来自我的4核笔记本电脑。但是在超级计算机上,我也看不到加速,而是降低了50%。

序列结果:

%time c = bootstrap_func()
CPU times: user 814 ms, sys: 58.7 ms, total: 872 ms
Wall time: 862 ms

Run Code Online (Sandbox Code Playgroud)

并行结果:

%time c = bootstrap_func_delayed_decorator().compute()
CPU times: user 96.2 ms, sys: 50 ms, total: 146 ms
Wall time: 906 ms

Run Code Online (Sandbox Code Playgroud)

从循环并行化的结果:

scheduler: synchronous
CPU times: user 2.57 s, sys: 330 ms, total: 2.9 s
Wall time: 2.95 s
scheduler: distributed
CPU times: user 4.51 s, sys: 2.74 s, total: 7.25 s
Wall time: 8.86 s
scheduler: multiprocessing
CPU times: user 4.18 s, sys: 2.53 s, total: 6.71 s
Wall time: 7.95 s
scheduler: processes
CPU times: user 3.97 s, sys: 2.1 s, total: 6.07 s
Wall time: 7.39 s
scheduler: single-threaded
CPU times: user 2.26 s, sys: 275 ms, total: 2.54 s
Wall time: 2.47 s
scheduler: threads
CPU times: user 2.84 s, sys: 341 ms, total: 3.18 s
Wall time: 2.66 s
Run Code Online (Sandbox Code Playgroud)

预期结果:-提速(.8 * ncores)

其他注意事项:-我还检查了是否应该对数据进行分块。样本块也太多。分块数组需要更长的时间。

我的问题:-我对dask并行化有什么误解?-这样的客户端设置没有用吗?-我实现了dask.delayed不够聪明吗?-我的串行功能是否已经因为dask并行执行?我觉得不是。