如何最好地将 NetCDF 文件集合重新分块到 Zarr 数据集

Ric*_*ell 8 python python-xarray netcdf4 dask-distributed zarr

我正在尝试重新整理 NetCDF 文件集合并在 AWS S3 上创建 Zarr 数据集。我有 168 个原始 NetCDF4 经典文件,其维度数组time: 1, y: 3840, x: 4608分块为chunks={'time':1, 'y':768, 'x':922}.

我想将此输出写入 Zarr,并且我想针对时间序列提取进行优化,因此在我的块中包含更多时间记录。我想我会使用 xarray 来帮助完成工作,因为我有很多处理器可以利用 Dask,而 xarrayxr.open_mfdatasetds.to_zarr.

我第一次尝试rechunking来chunks={'time':24, 'y':768, 'x':922}匹配输入NetCDF4在分块xy,但是当我试图写Zarr它抱怨,因为它需要在两个均匀的块大小xy,只允许非均匀大小沿的最后一块time尺寸(不幸在x维度中,总大小 4608 不是块大小 922 的倍数。

然后我尝试chunks={'time':168, 'y':384, 'x':288}并开始工作,并且非常快速地进行了几分钟,然后变得越来越慢。最终在 50 分钟后,集群死亡:

4072 distributed.core - INFO - Event loop was unresponsive in Worker for 1.41s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
4073 slurmstepd: error: Step 3294889.0 exceeded memory limit (25346188 > 25165824), being killed
Run Code Online (Sandbox Code Playgroud)

这是我正在使用的代码:

from dask.distributed import Client

import pandas as pd
import xarray as xr
import s3fs
import zarr

client = Client(scheduler_file='/home/rsignell/scheduler.json')
client
Run Code Online (Sandbox Code Playgroud)

在此处输入图片说明

root = '/lustre/projects/hazards/cmgp/woodshole/rsignell/nwm/forcing_short_range/' 

bucket_endpoint='https://s3.us-west-1.amazonaws.com/'

f_zarr = 'rsignell/nwm/test_week4'

dates = pd.date_range(start='2018-04-01T00:00', end='2018-04-07T23:00', freq='H')

urls = ['{}{}/nwm.t{}z.short_range.forcing.f001.conus.nc'.format(root,a.strftime('%Y%m%d'),a.strftime('%H')) for a in dates]

ds = xr.open_mfdataset(urls, concat_dim='time', chunks={'time':1, 'y':768, 'x':922})
ds = ds.drop(['ProjectionCoordinateSystem','time_bounds'])
ds = ds.chunk(chunks={'time':168, 'y':384, 'x':288}).persist()
ds
Run Code Online (Sandbox Code Playgroud)

生产

<xarray.Dataset>
Dimensions:         (reference_time: 168, time: 168, x: 4608, y: 3840)
Coordinates:
  * reference_time  (reference_time) datetime64[ns] 2018-04-01 ...
  * x               (x) float64 -2.304e+06 -2.303e+06 -2.302e+06 -2.301e+06 ...
  * y               (y) float64 -1.92e+06 -1.919e+06 -1.918e+06 -1.917e+06 ...
  * time            (time) datetime64[ns] 2018-04-01T01:00:00 ...
Data variables:
    T2D             (time, y, x) float64 dask.array<shape=(168, 3840, 4608), chunksize=(168, 384, 288)>
    LWDOWN          (time, y, x) float64 dask.array<shape=(168, 3840, 4608), chunksize=(168, 384, 288)>
    Q2D             (time, y, x) float64 dask.array<shape=(168, 3840, 4608), chunksize=(168, 384, 288)>
    U2D             (time, y, x) float64 dask.array<shape=(168, 3840, 4608), chunksize=(168, 384, 288)>
    V2D             (time, y, x) float64 dask.array<shape=(168, 3840, 4608), chunksize=(168, 384, 288)>
    PSFC            (time, y, x) float64 dask.array<shape=(168, 3840, 4608), chunksize=(168, 384, 288)>
    RAINRATE        (time, y, x) float32 dask.array<shape=(168, 3840, 4608), chunksize=(168, 384, 288)>
    SWDOWN          (time, y, x) float64 dask.array<shape=(168, 3840, 4608), chunksize=(168, 384, 288)>
Run Code Online (Sandbox Code Playgroud)

然后我打电话

fs = s3fs.S3FileSystem(anon=False, client_kwargs=dict(endpoint_url=bucket_endpoint))
d = s3fs.S3Map(f_zarr, s3=fs)

compressor = zarr.Blosc(cname='zstd', clevel=3, shuffle=2)
encoding = {vname: {'compressor': compressor} for vname in ds.data_vars}

delayed_store = ds.to_zarr(store=d, mode='w', encoding=encoding, compute=False)
persist_store = delayed_store.persist(retries=100)
Run Code Online (Sandbox Code Playgroud)

就在它消亡之前,Dask 仪表板看起来像这样:

在此处输入图片说明 在此处输入图片说明

NetCDF4 文件的总大小为 20GB,所以我在 Dask Dashboard 中显示超过 500GB 似乎有点疯狂,而 30 个处理器每个具有 60GB RAM 不足以完成这项工作。

我做错了什么,或者什么是更好的解决方案?

小智 3

我注意到你说你想增加时间维度中的块数量。或者也许我误解了。

您从指定为 的块开始chunks={'time':1, 'y':768, 'x':922},但随后尝试chunks={'time':168, 'y':384, 'x':288}发现第二个块使用了大量内存。

问题是chunks关键字指定了块的大小,而不是块的数量

在第一种情况下,每个块的大小为1*768*922 ~ 7e5,而在第二种情况下,每个块的大小为168*384*288 ~ 2e7

时间上的最大块数是通过 实现的chunks={'time': 1}