通过 xarray.to_zarr 和 Dask 并行附加到 zarr 存储

Max*_*ime 7 python-xarray zarr

我处于一种情况,我想加载对象,将它们转换为一个xarray.Dataset并将其写入 s3 上的 zarr 存储。但是,为了更快地加载对象,我使用 Dask 集群跨不同年份并行执行,因此,也希望并行写入 s3。这里的一般结构是:

def my_task(year, store, synchronizer):
    # load objects for that year
    ...
    dataset = ...
    # I want to append this data to a zarr store on S3
    dataset.to_zarr(store, append_dim="time", synchronizer=synchronizer)


futures = []    
for y in years:
    futures.append(client.submit(my_task, y, same_store, synchronizer))
client.gather(futures)
Run Code Online (Sandbox Code Playgroud)

但是,这样做会失败:当多个工作人员同时写入时,存储会进入不一致的状态。我试过使用,zarr.sync.ProcessSynchronizer但问题仍然存在,类似于这种情况:How can one write lock a zarr store during append? 任何帮助或指导将不胜感激。谢谢!