将numpy解决方案转换为dask(numpy索引在dask中不起作用)

pat*_*987 16 python numpy dask dask-distributed

我正在尝试将我的蒙特卡罗模拟转换numpydask,因为有时数组太大,无法适应内存.因此,我在云中设置了一组计算机:我的dask集群由24个内核和94 GB内存组成.我为这个问题准备了我的代码的简化版本.

我的原始numpy代码如下所示:

def numpy_way(sim_count, sim_days, hist_days):
   historical_data = np.random.normal(111.51, 10, hist_days)
   historical_multidim = np.empty(shape=(1, 1, sim_count, hist_days))
   historical_multidim[:, :, :, :] = historical_data


   random_days_panel = np.random.randint(low=1,
                                      high=hist_days,
                                      size=(1, 1, sim_count, sim_days))
   future_panel = historical_multidim[np.arange(1)[:, np.newaxis, np.newaxis, np.newaxis],
                                      np.arange(1)[:, np.newaxis, np.newaxis],
                                      np.arange(sim_count)[:, np.newaxis],
                                      random_days_panel]
   return future_panel.shape
Run Code Online (Sandbox Code Playgroud)

注意:我只是在这里返回numpy数组的形状(但是因为它是numpy,所以future_panel的元素在内存中是有意义的.

关于功能的一些话:

  • 我正在创建一个随机数组historical_data- 这只是1D
  • 然后将该数组"广播"为4D数组(historical_multidim).这里不使用前两个维度(但它们在我的最终应用程序中)
    • 第三维表示完成了多少次模拟
    • 第四维度是forecasted未来的天数
  • random_days_panel- 只是一个ndarray随机选择的日子.所以shape这个数组的最后一个是:1,1,sim_count,sim_days(在上一点解释)
  • future_panelndarray随机选取的值historical_multidim.即从具有预期形状的历史数据生成的数组(1,1,sim_count,sim_days)

现在,问题是,其中一些步骤没有在dask中实现:

  • historical_multidim[:, :, :, :] = historical_data- stackbroadcast_to建议使用
  • 所使用的切片future_panel是不可能的DASK

所以我推出了这个解决方案:

def dask_way_1d(sim_count, sim_days, hist_days):
    historical_data = da.random.normal(111.51, 10, size=hist_days, chunks='auto')
    def get_random_days_1d():
        return np.random.randint(low=1, high=HIST_DAYS, size=sim_days)
    future_simulations = [historical_data[get_random_days_1d()] for _ in range(sim_count)]
    future_panel =  da.stack(future_simulations)
    future_panel = da.broadcast_to(future_panel, shape=(1, 1, sim_count, sim_days))
    future_panel.compute()
    return future_panel.shape
Run Code Online (Sandbox Code Playgroud)

这个解决方案有效,但它比numpy解决方案要慢得多.问题是,get_random_days_1d()返回一个numpy数组.我试图使用dask数组,但在计算时遇到错误historical_data[get_random_days_1d()]- >KilledWorker: ("('normal-932553ab53ba4c7e908d61724430bbb2', 0)", ...

另一种解决方案如下:

    def dask_way_nd(sim_count, sim_days, hist_days):
        historical_data_1d = da.random.normal(111.51, 10, size=hist_days, chunks='auto')
        historical_data_2d = da.broadcast_to(historical_data_1d, shape=(sim_count, hist_days))

        random_days_panel = np.random.randint(low=1,
                                      high=hist_days,
                                      size=(sim_count, sim_days))

        future_panel = historical_data_2d[np.arange(sim_count)[:, np.newaxis], random_days_panel]
        future_panel = da.broadcast_to(future_panel, shape=(1, 1, sim_count, sim_days))
        future_panel.compute()
        return future_panel.shape
Run Code Online (Sandbox Code Playgroud)

此解决方案停止future_panel = historical_data_2d[np.arange(sim_count)[:, np.newaxis], random_days_panel]- >错误是:NotImplementedError: Don't yet support nd fancy indexing

所以我的问题是,有没有办法实现与我的numpy代码相同的行为?但当然我希望获得更好的性能(即更快的执行时间)

Fir*_*ger 1

random_days_panel而不是historical_data并使用da.map_blocks

\n
def dask_way(sim_count, sim_days, hist_days):\n    # shared historical data\n    # on a cluster you\'d load this on each worker, e.g. from a NPZ file\n    historical_data = np.random.normal(111.51, 10, size=hist_days)\n\n    random_days_panel = da.random.randint(\n        1, hist_days, size=(1, 1, sim_count, sim_days)\n    )\n    future_panel = da.map_blocks(\n        lambda chunk: historical_data[chunk], random_days_panel, dtype=float\n    )\n\n    future_panel.compute()\n\n    return future_panel\n
Run Code Online (Sandbox Code Playgroud)\n

这会将所有工作委托给工作人员,并且一旦您的问题变得足够大以分摊启动调度程序和分发上下文的初始(恒定)成本,就会比纯 numpy 更快:

\n
hist_days = int(1e6)\nsim_days = int(1e5)\nsim_count = int(1000)\n\n# dask_way(sim_count, sim_days, hist_days)\n# (code in this answer)\n532 ms \xc2\xb1 53.7 ms per loop (mean \xc2\xb1 std. dev. of 7 runs, 1 loop each)\n\n# numpy_way(sim_count, sim_days, hist_days)\n# (code in the question)\n4.47 s \xc2\xb1 79.8 ms per loop (mean \xc2\xb1 std. dev. of 7 runs, 1 loop each)\n\n# dask_way_1d(sim_count, sim_days, hist_days)\n# (code in the question)\n5.76 s \xc2\xb1 91.4 ms per loop (mean \xc2\xb1 std. dev. of 7 runs, 1 loop each)\n
Run Code Online (Sandbox Code Playgroud)\n