我正在尝试重新整理 NetCDF 文件集合并在 AWS S3 上创建 Zarr 数据集。我有 168 个原始 NetCDF4 经典文件,其维度数组time: 1, y: 3840, x: 4608
分块为chunks={'time':1, 'y':768, 'x':922}
.
我想将此输出写入 Zarr,并且我想针对时间序列提取进行优化,因此在我的块中包含更多时间记录。我想我会使用 xarray 来帮助完成工作,因为我有很多处理器可以利用 Dask,而 xarrayxr.open_mfdataset
和ds.to_zarr
.
我第一次尝试rechunking来chunks={'time':24, 'y':768, 'x':922}
匹配输入NetCDF4在分块x
和y
,但是当我试图写Zarr它抱怨,因为它需要在两个均匀的块大小x
和y
,只允许非均匀大小沿的最后一块time
尺寸(不幸在x
维度中,总大小 4608 不是块大小 922 的倍数。
然后我尝试chunks={'time':168, 'y':384, 'x':288}
并开始工作,并且非常快速地进行了几分钟,然后变得越来越慢。最终在 50 分钟后,集群死亡:
4072 distributed.core - INFO - Event loop was unresponsive in Worker for 1.41s. This is often caused by long-running …
Run Code Online (Sandbox Code Playgroud) 我处于一种情况,我想加载对象,将它们转换为一个xarray.Dataset
并将其写入 s3 上的 zarr 存储。但是,为了更快地加载对象,我使用 Dask 集群跨不同年份并行执行,因此,也希望并行写入 s3。这里的一般结构是:
def my_task(year, store, synchronizer):
# load objects for that year
...
dataset = ...
# I want to append this data to a zarr store on S3
dataset.to_zarr(store, append_dim="time", synchronizer=synchronizer)
futures = []
for y in years:
futures.append(client.submit(my_task, y, same_store, synchronizer))
client.gather(futures)
Run Code Online (Sandbox Code Playgroud)
但是,这样做会失败:当多个工作人员同时写入时,存储会进入不一致的状态。我试过使用,zarr.sync.ProcessSynchronizer
但问题仍然存在,类似于这种情况:How can one write lock a zarr store during append?
任何帮助或指导将不胜感激。谢谢!
如何在不覆盖整个内容的情况下DataArray
向现有内容添加新内容?Dataset
新的DataArray
与现有的有一些坐标,但也有新的。在我当前的实现中,Dataset
被完全覆盖,而不是仅仅添加新的东西。
现有的DataArray
是分块的 zarr 支持DirectoryStore
(尽管我对于 S3 存储也有同样的问题)。
import numpy as np
import xarray as xr
import zarr
arr1 = xr.DataArray(np.random.randn(2, 3),
[('x', ['a', 'b']), ('y', [10, 20, 30])],
name='arr1')
ds = arr1.chunk({'x': 1, 'y': 3}).to_dataset()
Run Code Online (Sandbox Code Playgroud)
ds
看起来像这样:
<xarray.Dataset>
Dimensions: (x: 2, y: 3)
Coordinates:
* x (x) <U1 'a' 'b'
* y (y) int64 10 20 30
Data variables:
arr1 (x, y) float64 dask.array<shape=(2, 3), chunksize=(1, 3)>
Run Code Online (Sandbox Code Playgroud)
我将其写入目录存储:
store …
Run Code Online (Sandbox Code Playgroud) 我在 zarr 中有一个简单的对象数组(比如长度为 1000)。我想用一个精简的版本替换它,只选择项目的一个子集,如使用大小为 1000 的布尔数组指定的那样。我想保持其他所有内容相同(例如,如果这个数组是一个持久的数组,我想更改磁盘和内存中的阵列)。我不能简单地重新分配数组:
my_zarr_data = my_zarr_data[:][selected_items]
因为那时我得到了错误ValueError: missing object_codec for object array
。另一种选择是制作一个副本,删除所有数据,然后使用 将其从原始数据中添加回来append()
,但我看不到如何清除zarr 数组同时保持 object_codec 和其他参数相同(也许我可以这样做resize(0)
?)。目前我正在调整大小sum(selected_items)
,然后使用my_zarr_data.set_basic_selection(..., my_zarr_data[:][selected_items])
. 那正确吗?是否有更有效的方法将数组永久重新分配给(例如)来自 的返回值get_mask_selection()
?
我正在开发一个 pytorch 项目,我的数据保存在zarr
.
随机访问的zarr
成本很高,但由于zarr
使用了块式缓存,迭代速度非常快。为了利用这一事实,我与多个工作人员一起使用IterableDataset
:
class Data(IterableDataset):
def __init__(self, path, start=None, end=None):
super(Data, self).__init__()
store = zarr.DirectoryStore(path)
self.array = zarr.open(store, mode='r')
if start is None:
start = 0
if end is None:
end = self.array.shape[0]
assert end > start
self.start = start
self.end = end
def __iter__(self):
return islice(self.array, self.start, self.end)
Run Code Online (Sandbox Code Playgroud)
问题是,对于连续的工作人员,随着行的顺序,自然self.array
会变得更大,创建生成器会在我的训练/验证过程中花费大量时间,因为仍然需要迭代不需要的元素,直到它到达。一旦为每个工作人员创建了一个生成器,这就像一个魅力,但实现这一目标需要很长时间。10e9
self.start
self.end
itertools.islice(array, start, end)
islice
start
有没有更好的方法来创建这样的生成器?或者也许有更聪明的方法来zarr
使用pytorch
?
我正在尝试打开 zarr 文件,
import pandas as pd
import xarray as xr
xf = xr.open_zarr("../../data/processed/geolink_norge_dataset/geolink_norge_well_logs.zarr")
Run Code Online (Sandbox Code Playgroud)
但出现错误:
ValueError Traceback (most recent call last) <ipython-input-17-ff38d9c54463> in <module>
1 import pandas as pd
2 import xarray as xr
----> 3 xf = xr.open_zarr("../../data/processed/geolink_norge_dataset/geolink_norge_well_logs.zarr")
4
5 # We will use just the 30* wells
C:\ProgramData\Anaconda3\lib\site-packages\xarray\backends\zarr.py in open_zarr(store, group, synchronizer, chunks, decode_cf, mask_and_scale, decode_times, concat_characters, decode_coords, drop_variables, consolidated, overwrite_encoded_chunks, chunk_store, storage_options, decode_timedelta, use_cftime, **kwargs)
685 }
686
--> 687 ds = open_dataset(
688 filename_or_obj=store,
689 group=group, …
Run Code Online (Sandbox Code Playgroud) 我有一长串 .zarr 数组,我想将它们合并到一个数组中并写入磁盘。
我的代码大致如下:
import dask.array
import zarr
import os
local_paths = ['parts/X_00000000.zarr',
'parts/X_00000001.zarr',
'parts/X_00000002.zarr',
'parts/X_00000003.zarr',
'parts/X_00000004.zarr',
'parts/X_00000005.zarr',
...]
result_path = "testtest"
os.makedirs(result_path)
Xs = [dask.array.from_zarr(zarr.DirectoryStore(p)) for p in local_paths]
X = dask.array.concatenate(Xs, axis=0)
X = X.rechunk({0: 10000, 1:-1, 2:-1, 3:-1})
dask.array.to_zarr(X, zarr.DirectoryStore(result_path))
Run Code Online (Sandbox Code Playgroud)
来自的每个数组都local_paths
包含一个 64x64 图像列表。这些列表的长度各不相同。所以第一个的形状可能是(100, 64, 64, 3)
,第二个的形状可能是(200, 64, 64, 3)
。
执行此代码的最后一行,导致我的内存完全消耗,然后 Jupyter 笔记本完全崩溃(没有给我错误消息或异常)。
为了调查问题,我打印了任务图,因此用以下两行替换了最后一行:
k = dask.array.to_zarr(X, zarr.DirectoryStore(result_path), compute=False)
k.visualize()
Run Code Online (Sandbox Code Playgroud)
它非常大(链接),所以我只截取了其中两个有趣的部分:
这种结构一直重复。Dask 获取连接的输出,重新分配数据,然后尝试存储它。请注意作为重叠过渡结果的粗黑条。
现在看看这些转换来自哪里:
查看create
中间的节点。我假设这是图中创建 zarr DirectoryStore 的部分。 …
我想,以产生zarr阵列指向部分磁盘上的zarr阵列,类似于如何sliced = np_arr[5]
给我的视图成np_arr
,使得在修改该数据sliced
修改的数据np_arr
。示例代码:
import matplotlib.pyplot as plt
import numpy as np
import zarr
arr = zarr.open(
'temp.zarr',
mode='a',
shape=(4, 32, 32),
chunks=(1, 16, 16),
dtype=np.float32,
)
arr[:] = np.random.random((4, 32, 32))
fig, ax = plt.subplots(1, 2)
arr[2, ...] = 0 # works fine, "wipes" slice 2
ax[0].imshow(arr[2]) # all 0s
arr_slice = arr[1] # returns a NumPy array — loses ties to zarr on disk
arr_slice[:] = 0
ax[1].imshow(arr[1]) # …
Run Code Online (Sandbox Code Playgroud) 全部。我正在使用 Dask 分布式集群在循环内编写 Zarr+Dask 支持的 Xarray 数据集,并且dataset.to_zarr
正在阻塞。当存在阻碍循环继续的散乱块时,这确实会减慢速度。有没有办法.to_zarr
异步执行,以便循环可以继续下一个数据集写入而不会被一些落后的块所阻碍?
我在 s3 中有每月的 zarr 文件,这些文件具有网格化的温度数据。我想为一个纬度/经度提取多个月的数据并创建该时间序列的数据框。一些伪代码:
datasets=[]
for file in files:
s3 = s3fs.S3FileSystem()
zarr_store = s3fs.S3Map(file, s3=s3)
zarr = xr.open_zarr(store=zarr_store, consolidated=True)
ds = zarr.sel(latitude=lat,
longitude=long,
time=slice(start_date.strftime("%Y-%m-%d"),
end_date.strftime("%Y-%m-%d"))
)
datasets.append(ds)
con = xr.concat(datasets, dim='time')
df = con.to_dataframe()
Run Code Online (Sandbox Code Playgroud)
所以这段代码可以工作,但速度非常慢。我希望使用 dask 来加快速度。我的计划是更改一次处理一个文件并返回一个数据帧的方法。然后我会调用 client.map() 并生成所有 dfs,然后在最后将它们连接在一起。所以我结束了类似的事情:
def load(file, lat: float, long: float, start_date, end_date):
s3 = s3fs.S3FileSystem()
s3_path = file['s3_bucket'] + '/' + file['zarr_s3_key']
zarr_store = s3fs.S3Map(s3_path, s3=s3)
zarr = xr.open_zarr(store=zarr_store, consolidated=True)
ds = zarr.sel(latitude=lat,
longitude=long,
time=slice(start_date.strftime("%Y-%m-%d"),
end_date.strftime("%Y-%m-%d"))
)
tmp = x.result().to_array().values
df_time = …
Run Code Online (Sandbox Code Playgroud) 我正在打开一个 zarr 文件,然后将其重新分块,然后将其写回另一个 zarr 存储。然而,当我重新打开它时,它不尊重我之前编写的块大小。这是 jupyter 的代码和输出。知道我在这里做错了什么吗?
bathy_ds = xr.open_zarr('data/bathy_store')
bathy_ds.elevation
Run Code Online (Sandbox Code Playgroud)
bathy_ds.chunk(5000).elevation
Run Code Online (Sandbox Code Playgroud)
bathy_ds.chunk(5000).to_zarr('data/elevation_store')
new_ds = xr.open_zarr('data/elevation_store')
new_ds.elevation
Run Code Online (Sandbox Code Playgroud)
它正在恢复到原始分块,好像我没有完全覆盖它或更改其他一些需要更改的设置。
我想在磁盘阵列上存储由 zarr 支持的 1TB 随机数据。目前,我正在做如下事情:
import numpy as np
import zarr
from numcodecs import Blosc
compressor = Blosc(cname='lz4', clevel=5, shuffle=Blosc.BITSHUFFLE)
store = zarr.DirectoryStore('TB1.zarr')
root = zarr.group(store)
TB1 = root.zeros('data',
shape=(1_000_000, 1_000_000),
chunks=(20_000, 5_000),
compressor=compressor,
dtype='|i2')
for i in range(1_000_000):
TB1[i, :1_000_000] = np.random.randint(0, 3, size=1_000_000, dtype='|i2')
Run Code Online (Sandbox Code Playgroud)
这将需要一些时间——我知道如果我不总是生成1_000_000
随机数而是重用数组,事情可能会得到改善,但我现在想要更多的随机性。有没有更好的方法来构建这个随机数据集?
使用更大的 numpy 块可以加快速度:
for i in range(0, 1_000_000, 100_000):
TB1[i:i+100_000, :1_000_000] = np.random.randint(0, 3, size=(100_000, 1_000_000), dtype='|i2')
Run Code Online (Sandbox Code Playgroud)