pat*_*987 16 python numpy dask dask-distributed
我正在尝试将我的蒙特卡罗模拟转换numpy
为dask
,因为有时数组太大,无法适应内存.因此,我在云中设置了一组计算机:我的dask集群由24个内核和94 GB内存组成.我为这个问题准备了我的代码的简化版本.
我的原始numpy
代码如下所示:
def numpy_way(sim_count, sim_days, hist_days):
historical_data = np.random.normal(111.51, 10, hist_days)
historical_multidim = np.empty(shape=(1, 1, sim_count, hist_days))
historical_multidim[:, :, :, :] = historical_data
random_days_panel = np.random.randint(low=1,
high=hist_days,
size=(1, 1, sim_count, sim_days))
future_panel = historical_multidim[np.arange(1)[:, np.newaxis, np.newaxis, np.newaxis],
np.arange(1)[:, np.newaxis, np.newaxis],
np.arange(sim_count)[:, np.newaxis],
random_days_panel]
return future_panel.shape
Run Code Online (Sandbox Code Playgroud)
注意:我只是在这里返回numpy数组的形状(但是因为它是numpy,所以future_panel的元素在内存中是有意义的.
关于功能的一些话:
historical_data
- 这只是1Dhistorical_multidim
).这里不使用前两个维度(但它们在我的最终应用程序中)
forecasted
未来的天数random_days_panel
- 只是一个ndarray
随机选择的日子.所以shape
这个数组的最后一个是:1,1,sim_count,sim_days(在上一点解释)future_panel
是ndarray
随机选取的值historical_multidim
.即从具有预期形状的历史数据生成的数组(1,1,sim_count,sim_days)现在,问题是,其中一些步骤没有在dask中实现:
historical_multidim[:, :, :, :] = historical_data
- stack
或
broadcast_to
建议使用future_panel
是不可能的DASK所以我推出了这个解决方案:
def dask_way_1d(sim_count, sim_days, hist_days):
historical_data = da.random.normal(111.51, 10, size=hist_days, chunks='auto')
def get_random_days_1d():
return np.random.randint(low=1, high=HIST_DAYS, size=sim_days)
future_simulations = [historical_data[get_random_days_1d()] for _ in range(sim_count)]
future_panel = da.stack(future_simulations)
future_panel = da.broadcast_to(future_panel, shape=(1, 1, sim_count, sim_days))
future_panel.compute()
return future_panel.shape
Run Code Online (Sandbox Code Playgroud)
这个解决方案有效,但它比numpy解决方案要慢得多.问题是,get_random_days_1d()
返回一个numpy
数组.我试图使用dask
数组,但在计算时遇到错误historical_data[get_random_days_1d()]
- >KilledWorker: ("('normal-932553ab53ba4c7e908d61724430bbb2', 0)", ...
另一种解决方案如下:
def dask_way_nd(sim_count, sim_days, hist_days):
historical_data_1d = da.random.normal(111.51, 10, size=hist_days, chunks='auto')
historical_data_2d = da.broadcast_to(historical_data_1d, shape=(sim_count, hist_days))
random_days_panel = np.random.randint(low=1,
high=hist_days,
size=(sim_count, sim_days))
future_panel = historical_data_2d[np.arange(sim_count)[:, np.newaxis], random_days_panel]
future_panel = da.broadcast_to(future_panel, shape=(1, 1, sim_count, sim_days))
future_panel.compute()
return future_panel.shape
Run Code Online (Sandbox Code Playgroud)
此解决方案停止future_panel = historical_data_2d[np.arange(sim_count)[:, np.newaxis], random_days_panel]
- >错误是:NotImplementedError: Don't yet support nd fancy indexing
所以我的问题是,有没有办法实现与我的numpy代码相同的行为?但当然我希望获得更好的性能(即更快的执行时间)
块random_days_panel
而不是historical_data
并使用da.map_blocks
:
def dask_way(sim_count, sim_days, hist_days):\n # shared historical data\n # on a cluster you\'d load this on each worker, e.g. from a NPZ file\n historical_data = np.random.normal(111.51, 10, size=hist_days)\n\n random_days_panel = da.random.randint(\n 1, hist_days, size=(1, 1, sim_count, sim_days)\n )\n future_panel = da.map_blocks(\n lambda chunk: historical_data[chunk], random_days_panel, dtype=float\n )\n\n future_panel.compute()\n\n return future_panel\n
Run Code Online (Sandbox Code Playgroud)\n这会将所有工作委托给工作人员,并且一旦您的问题变得足够大以分摊启动调度程序和分发上下文的初始(恒定)成本,就会比纯 numpy 更快:
\nhist_days = int(1e6)\nsim_days = int(1e5)\nsim_count = int(1000)\n\n# dask_way(sim_count, sim_days, hist_days)\n# (code in this answer)\n532 ms \xc2\xb1 53.7 ms per loop (mean \xc2\xb1 std. dev. of 7 runs, 1 loop each)\n\n# numpy_way(sim_count, sim_days, hist_days)\n# (code in the question)\n4.47 s \xc2\xb1 79.8 ms per loop (mean \xc2\xb1 std. dev. of 7 runs, 1 loop each)\n\n# dask_way_1d(sim_count, sim_days, hist_days)\n# (code in the question)\n5.76 s \xc2\xb1 91.4 ms per loop (mean \xc2\xb1 std. dev. of 7 runs, 1 loop each)\n
Run Code Online (Sandbox Code Playgroud)\n
归档时间: |
|
查看次数: |
677 次 |
最近记录: |