标签: zarr

如何最好地将 NetCDF 文件集合重新分块到 Zarr 数据集

我正在尝试重新整理 NetCDF 文件集合并在 AWS S3 上创建 Zarr 数据集。我有 168 个原始 NetCDF4 经典文件,其维度数组time: 1, y: 3840, x: 4608分块为chunks={'time':1, 'y':768, 'x':922}.

我想将此输出写入 Zarr,并且我想针对时间序列提取进行优化,因此在我的块中包含更多时间记录。我想我会使用 xarray 来帮助完成工作,因为我有很多处理器可以利用 Dask,而 xarrayxr.open_mfdatasetds.to_zarr.

我第一次尝试rechunking来chunks={'time':24, 'y':768, 'x':922}匹配输入NetCDF4在分块xy,但是当我试图写Zarr它抱怨,因为它需要在两个均匀的块大小xy,只允许非均匀大小沿的最后一块time尺寸(不幸在x维度中,总大小 4608 不是块大小 922 的倍数。

然后我尝试chunks={'time':168, 'y':384, 'x':288}并开始工作,并且非常快速地进行了几分钟,然后变得越来越慢。最终在 50 分钟后,集群死亡:

4072 distributed.core - INFO - Event loop was unresponsive in Worker for 1.41s.  This is often caused by long-running …
Run Code Online (Sandbox Code Playgroud)

python python-xarray netcdf4 dask-distributed zarr

8
推荐指数
1
解决办法
1198
查看次数

通过 xarray.to_zarr 和 Dask 并行附加到 zarr 存储

我处于一种情况,我想加载对象,将它们转换为一个xarray.Dataset并将其写入 s3 上的 zarr 存储。但是,为了更快地加载对象,我使用 Dask 集群跨不同年份并行执行,因此,也希望并行写入 s3。这里的一般结构是:

def my_task(year, store, synchronizer):
    # load objects for that year
    ...
    dataset = ...
    # I want to append this data to a zarr store on S3
    dataset.to_zarr(store, append_dim="time", synchronizer=synchronizer)


futures = []    
for y in years:
    futures.append(client.submit(my_task, y, same_store, synchronizer))
client.gather(futures)
Run Code Online (Sandbox Code Playgroud)

但是,这样做会失败:当多个工作人员同时写入时,存储会进入不一致的状态。我试过使用,zarr.sync.ProcessSynchronizer但问题仍然存在,类似于这种情况:How can one write lock a zarr store during append? 任何帮助或指导将不胜感激。谢谢!

python-xarray zarr

7
推荐指数
0
解决办法
315
查看次数

将新的 Xarray DataArray 添加到现有的 Zarr 存储而不重写整个数据集?

如何在不覆盖整个内容的情况下DataArray向现有内容添加新内容?Dataset新的DataArray与现有的有一些坐标,但也有新的。在我当前的实现中,Dataset被完全覆盖,而不是仅仅添加新的东西。

现有的DataArray是分块的 zarr 支持DirectoryStore(尽管我对于 S3 存储也有同样的问题)。

import numpy as np
import xarray as xr
import zarr

arr1 = xr.DataArray(np.random.randn(2, 3),
                   [('x', ['a', 'b']), ('y', [10, 20, 30])],
                   name='arr1')

ds = arr1.chunk({'x': 1, 'y': 3}).to_dataset()
Run Code Online (Sandbox Code Playgroud)

ds看起来像这样:

<xarray.Dataset>
Dimensions:  (x: 2, y: 3)
Coordinates:
  * x        (x) <U1 'a' 'b'
  * y        (y) int64 10 20 30
Data variables:
    arr1     (x, y) float64 dask.array<shape=(2, 3), chunksize=(1, 3)>
Run Code Online (Sandbox Code Playgroud)

我将其写入目录存储:

store …
Run Code Online (Sandbox Code Playgroud)

dask python-xarray zarr

5
推荐指数
2
解决办法
4227
查看次数

如何减少/删除 zarr 数组

我在 zarr 中有一个简单的对象数组(比如长度为 1000)。我想用一个精简的版本替换它,只选择项目的一个子集,如使用大小为 1000 的布尔数组指定的那样。我想保持其他所有内容相同(例如,如果这个数组是一个持久的数组,我想更改磁盘和内存中的阵列)。我不能简单地重新分配数组:

my_zarr_data = my_zarr_data[:][selected_items]

因为那时我得到了错误ValueError: missing object_codec for object array。另一种选择是制作一个副本,删除所有数据,然后使用 将其从原始数据中添加回来append(),但我看不到如何清除zarr 数组同时保持 object_codec 和其他参数相同(也许我可以这样做resize(0)?)。目前我正在调整大小sum(selected_items),然后使用my_zarr_data.set_basic_selection(..., my_zarr_data[:][selected_items]). 那正确吗?是否有更有效的方法将数组永久重新分配给(例如)来自 的返回值get_mask_selection()

zarr

5
推荐指数
0
解决办法
282
查看次数

在 zarr 数组上创建一个生成器,并为 pytorch 数据加载器提供开始和结束

我正在开发一个 pytorch 项目,我的数据保存在zarr.

随机访问的zarr成本很高,但由于zarr使用了块式缓存,迭代速度非常快。为了利用这一事实,我与多个工作人员一起使用IterableDataset

class Data(IterableDataset):
    def __init__(self, path, start=None, end=None):
        super(Data, self).__init__()
        store = zarr.DirectoryStore(path)
        self.array = zarr.open(store, mode='r')

        if start is None:
            start = 0
        if end is None:
            end = self.array.shape[0]

        assert end > start

        self.start = start
        self.end = end

    def __iter__(self):
        return islice(self.array, self.start, self.end)
Run Code Online (Sandbox Code Playgroud)

问题是,对于连续的工作人员,随着行的顺序,自然self.array会变得更大,创建生成器会在我的训练/验证过程中花费大量时间,因为仍然需要迭代不需要的元素,直到它到达。一旦为每个工作人员创建了一个生成器,这就像一个魅力,但实现这一目标需要很长时间。10e9self.startself.enditertools.islice(array, start, end)islicestart

有没有更好的方法来创建这样的生成器?或者也许有更聪明的方法来zarr使用pytorch

python arrays pytorch zarr

5
推荐指数
1
解决办法
936
查看次数

ValueError:无法识别的引擎 zarr 必须是以下之一:['scipy', 'store']

我正在尝试打开 zarr 文件,

import pandas as pd
import xarray as xr
xf = xr.open_zarr("../../data/processed/geolink_norge_dataset/geolink_norge_well_logs.zarr")
Run Code Online (Sandbox Code Playgroud)

但出现错误:


ValueError                                Traceback (most recent call last) <ipython-input-17-ff38d9c54463> in <module>
      1 import pandas as pd
      2 import xarray as xr
----> 3 xf = xr.open_zarr("../../data/processed/geolink_norge_dataset/geolink_norge_well_logs.zarr")
      4 
      5 # We will use just the 30* wells

C:\ProgramData\Anaconda3\lib\site-packages\xarray\backends\zarr.py in open_zarr(store, group, synchronizer, chunks, decode_cf, mask_and_scale, decode_times, concat_characters, decode_coords, drop_variables, consolidated, overwrite_encoded_chunks, chunk_store, storage_options, decode_timedelta, use_cftime, **kwargs)
    685     }
    686 
--> 687     ds = open_dataset(
    688         filename_or_obj=store,
    689         group=group, …
Run Code Online (Sandbox Code Playgroud)

scipy python-xarray zarr

5
推荐指数
1
解决办法
8620
查看次数

使用 Zarr 存储 Dask Array 占用太多内存

我有一长串 .zarr 数组,我想将它们合并到一个数组中并写入磁盘。

我的代码大致如下:

import dask.array
import zarr
import os

local_paths = ['parts/X_00000000.zarr',
 'parts/X_00000001.zarr',
 'parts/X_00000002.zarr',
 'parts/X_00000003.zarr',
 'parts/X_00000004.zarr',
 'parts/X_00000005.zarr',
 ...]

result_path = "testtest"
os.makedirs(result_path)

Xs = [dask.array.from_zarr(zarr.DirectoryStore(p)) for p in local_paths]
X = dask.array.concatenate(Xs, axis=0)
X = X.rechunk({0: 10000, 1:-1, 2:-1, 3:-1})
dask.array.to_zarr(X, zarr.DirectoryStore(result_path))
Run Code Online (Sandbox Code Playgroud)

来自的每个数组都local_paths包含一个 64x64 图像列表。这些列表的长度各不相同。所以第一个的形状可能是(100, 64, 64, 3),第二个的形状可能是(200, 64, 64, 3)

执行此代码的最后一行,导致我的内存完全消耗,然后 Jupyter 笔记本完全崩溃(没有给我错误消息或异常)。

为了调查问题,我打印了任务图,因此用以下两行替换了最后一行:

k = dask.array.to_zarr(X, zarr.DirectoryStore(result_path), compute=False)
k.visualize()
Run Code Online (Sandbox Code Playgroud)

它非常大(链接),所以我只截取了其中两个有趣的部分: 在此处输入图片说明

这种结构一直重复。Dask 获取连接的输出,重新分配数据,然后尝试存储它。请注意作为重叠过渡结果的粗黑条。

现在看看这些转换来自哪里:

在此处输入图片说明

查看create中间的节点。我假设这是图中创建 zarr DirectoryStore 的部分。 …

python dask zarr

5
推荐指数
0
解决办法
186
查看次数

获取 zarr 数组切片的视图

我想,以产生zarr阵列指向部分磁盘上的zarr阵列,类似于如何sliced = np_arr[5]给我的视图成np_arr,使得在修改该数据sliced修改的数据np_arr。示例代码:

import matplotlib.pyplot as plt
import numpy as np
import zarr


arr = zarr.open(
    'temp.zarr',
    mode='a',
    shape=(4, 32, 32),
    chunks=(1, 16, 16),
    dtype=np.float32,
)
arr[:] = np.random.random((4, 32, 32))

fig, ax = plt.subplots(1, 2)
arr[2, ...] = 0  # works fine, "wipes" slice 2
ax[0].imshow(arr[2])  # all 0s

arr_slice = arr[1]  # returns a NumPy array — loses ties to zarr on disk
arr_slice[:] = 0
ax[1].imshow(arr[1])  # …
Run Code Online (Sandbox Code Playgroud)

python numpy zarr

3
推荐指数
1
解决办法
328
查看次数

异步 Xarray 写入 Zarr

全部。我正在使用 Dask 分布式集群在循环内编写 Zarr+Dask 支持的 Xarray 数据集,并且dataset.to_zarr正在阻塞。当存在阻碍循环继续的散乱块时,这确实会减慢速度。有没有办法.to_zarr异步执行,以便循环可以继续下一个数据集写入而不会被一些落后的块所阻碍?

dask python-xarray zarr

2
推荐指数
1
解决办法
380
查看次数

如何使用 dask 和 xarray 加载和处理 zarr 文件

我在 s3 中有每月的 zarr 文件,这些文件具有网格化的温度数据。我想为一个纬度/经度提取多个月的数据并创建该时间序列的数据框。一些伪代码:

datasets=[]
for file in files:
    s3 = s3fs.S3FileSystem()        
    zarr_store = s3fs.S3Map(file, s3=s3)
    zarr = xr.open_zarr(store=zarr_store, consolidated=True)
    ds = zarr.sel(latitude=lat,
                  longitude=long,
                  time=slice(start_date.strftime("%Y-%m-%d"),
                             end_date.strftime("%Y-%m-%d"))
                       )
    datasets.append(ds)

con = xr.concat(datasets, dim='time')
df = con.to_dataframe()
Run Code Online (Sandbox Code Playgroud)

所以这段代码可以工作,但速度非常慢。我希望使用 dask 来加快速度。我的计划是更改一次处理一个文件并返回一个数据帧的方法。然后我会调用 client.map() 并生成所有 dfs,然后在最后将它们连接在一起。所以我结束了类似的事情:

def load(file, lat: float, long: float, start_date, end_date):

    s3 = s3fs.S3FileSystem()
    s3_path = file['s3_bucket'] + '/' + file['zarr_s3_key']
    zarr_store = s3fs.S3Map(s3_path, s3=s3)
    zarr = xr.open_zarr(store=zarr_store, consolidated=True)

    ds = zarr.sel(latitude=lat,
                  longitude=long,
                  time=slice(start_date.strftime("%Y-%m-%d"),
                             end_date.strftime("%Y-%m-%d"))
                 )

    tmp = x.result().to_array().values
    df_time = …
Run Code Online (Sandbox Code Playgroud)

python dask python-xarray zarr

1
推荐指数
1
解决办法
777
查看次数

zarr 不尊重 xarray 的块大小并恢复到原始块大小

我正在打开一个 zarr 文件,然后将其重新分块,然后将其写回另一个 zarr 存储。然而,当我重新打开它时,它不尊重我之前编写的块大小。这是 jupyter 的代码和输出。知道我在这里做错了什么吗?

bathy_ds = xr.open_zarr('data/bathy_store')
bathy_ds.elevation
Run Code Online (Sandbox Code Playgroud)

在此处输入图片说明

bathy_ds.chunk(5000).elevation
Run Code Online (Sandbox Code Playgroud)

在此处输入图片说明

bathy_ds.chunk(5000).to_zarr('data/elevation_store')
new_ds = xr.open_zarr('data/elevation_store')
new_ds.elevation
Run Code Online (Sandbox Code Playgroud)

在此处输入图片说明

它正在恢复到原始分块,好像我没有完全覆盖它或更改其他一些需要更改的设置。

python python-xarray zarr

1
推荐指数
1
解决办法
107
查看次数

使用 Zarr 存储 1TB 随机数据的有效方式

我想在磁盘阵列上存储由 zarr 支持的 1TB 随机数据。目前,我正在做如下事情:

import numpy as np
import zarr
from numcodecs import Blosc

compressor = Blosc(cname='lz4', clevel=5, shuffle=Blosc.BITSHUFFLE)
store = zarr.DirectoryStore('TB1.zarr')
root = zarr.group(store)
TB1 = root.zeros('data',
           shape=(1_000_000, 1_000_000),
           chunks=(20_000, 5_000),
           compressor=compressor,
           dtype='|i2')

for i in range(1_000_000): 
    TB1[i, :1_000_000] = np.random.randint(0, 3, size=1_000_000, dtype='|i2')
Run Code Online (Sandbox Code Playgroud)

这将需要一些时间——我知道如果我不总是生成1_000_000随机数而是重用数组,事情可能会得到改善,但我现在想要更多的随机性。有没有更好的方法来构建这个随机数据集?

更新 1

使用更大的 numpy 块可以加快速度:

for i in range(0, 1_000_000, 100_000): 
    TB1[i:i+100_000, :1_000_000] = np.random.randint(0, 3, size=(100_000, 1_000_000), dtype='|i2')
Run Code Online (Sandbox Code Playgroud)

numpy zarr

0
推荐指数
1
解决办法
155
查看次数

标签 统计

zarr ×12

python-xarray ×7

python ×6

dask ×4

numpy ×2

arrays ×1

dask-distributed ×1

netcdf4 ×1

pytorch ×1

scipy ×1