TensorFlow - tf.data.Dataset读取大型HDF5文件

ver*_*man 17 python video hdf5 tensorflow tensorflow-datasets

我正在设置一个TensorFlow管道,用于读取大型HDF5文件作为我的深度学习模型的输入.每个HDF5文件包含100个可变大小长度的视频,存储为压缩JPG图像的集合(以使磁盘上的大小可管理).使用tf.data.Dataset和映射tf.py_func,使用自定义Python逻辑从HDF5文件中读取示例非常简单.例如:

def read_examples_hdf5(filename, label):
    with h5py.File(filename, 'r') as hf:
        # read frames from HDF5 and decode them from JPG
    return frames, label

filenames = glob.glob(os.path.join(hdf5_data_path, "*.h5"))
labels = [0]*len(filenames) # ... can we do this more elegantly?

dataset = tf.data.Dataset.from_tensor_slices((filenames, labels))
dataset = dataset.map(
    lambda filename, label: tuple(tf.py_func(
        read_examples_hdf5, [filename, label], [tf.uint8, tf.int64]))
)

dataset = dataset.shuffle(1000 + 3 * BATCH_SIZE)
dataset = dataset.batch(BATCH_SIZE)
iterator = dataset.make_one_shot_iterator()
next_batch = iterator.get_next()
Run Code Online (Sandbox Code Playgroud)

这个例子有效,但问题是它似乎tf.py_func一次只能处理一个例子.由于我的HDF5容器存储了100个示例,因此这种限制会导致显着的开销,因为文件经常需要打开,读取,关闭和重新打开.将所有100个视频示例读入数据集对象然后继续使用下一个HDF5文件(最好是在多个线程中,每个线程处理它自己的HDF5文件集合)会更有效率.

所以,我想要的是在后台运行的一些线程,从HDF5文件中读取视频帧,从JPG解码它们,然后将它们提供给数据集对象.在引入tf.data.Dataset管道之前,使用RandomShuffleQueueenqueue_manyops 很容易,但似乎目前还没有优雅的方法(或文档缺乏).

有谁知道实现目标的最佳方式是什么?我也使用tfrecord文件研究(并实现)了管道,但是随机抽取存储在tfrecord文件中的视频帧似乎是不可能的(见这里).另外,我已经查看了from_generator()输入,tf.data.Dataset但这似乎不会在多个线程中运行.任何建议都非常受欢迎.

mik*_*ola 16

在处理类似问题时,我偶然发现了这个问题.我提出了一个基于使用Python生成器的解决方案,以及TF数据集构造方法from_generator.因为我们使用生成器,所以HDF5文件应该只打开一次并且只要有要读取的条目就保持打开状态.因此,不会打开,读取,然后关闭每次调用以获取下一个数据元素.

发电机定义

为了允许用户传入HDF5文件名作为参数,我生成了一个具有__call__方法的类,因为from_generator指定了生成器必须是可调用的.这是发电机:

import h5py
import tensorflow as tf

class generator:
    def __init__(self, file):
        self.file = file

    def __call__(self):
        with h5py.File(self.file, 'r') as hf:
            for im in hf["train_img"]:
                yield im
Run Code Online (Sandbox Code Playgroud)

通过使用生成器,代码应该从上次返回结果时的每次调用中从中断处开始,而不是从头开始再次运行.在这种情况下,它在内for循环的下一次迭代中.所以这应该跳过再次打开文件进行读取,只要有数据就保持打开状态yield.有关发电机的更多信息,请参阅此优秀问答.

当然,您必须替换with块内的任何内容,以匹配数据集的构造方式以及要获取的输出.

用法示例

ds = tf.data.Dataset.from_generator(
    generator(hdf5_path), 
    tf.uint8, 
    tf.TensorShape([427,561,3]))

value = ds.make_one_shot_iterator().get_next()

# Example on how to read elements
while True:
    try:
        data = sess.run(value)
        print(data.shape)
    except tf.errors.OutOfRangeError:
        print('done.')
        break
Run Code Online (Sandbox Code Playgroud)

同样,在我的情况下,我在我的数据集中存储uint8了高度427,宽度5613颜色通道的图像,因此您需要在上面的调用中修改这些图像以匹配您的用例.

处理多个文件

我有一个处理多个HDF5文件的建议解决方案.基本思想是Dataset像往常一样从文件名构造一个,然后使用该interleave方法同时处理许多输入文件,例如从每个输入文件中获取样本以形成批处理.

这个想法如下:

ds = tf.data.Dataset.from_tensor_slices(filenames)
# You might want to shuffle() the filenames here depending on the application
ds = ds.interleave(lambda filename: tf.data.Dataset.from_generator(
        generator(filename), 
        tf.uint8, 
        tf.TensorShape([427,561,3])),
       cycle_length, block_length)
Run Code Online (Sandbox Code Playgroud)

这样做的同时打开cycle_length文件,并block_length在移动到下一个文件之前从每个文件中生成项目 - interleave有关详细信息,请参阅文档.您可以在此处设置值以匹配适合您的应用程序的值:例如,您是需要一次处理一个文件还是同时处理多个文件,您是否只想从每个文件一次处理一个样本,依此类推.

编辑:对于并行版本,请看一下tf.contrib.data.parallel_interleave!

可能的警告

from_generator如果您决定使用该解决方案,请注意使用的特殊性.对于Tensorflow 1.6.0,文档from_generator提到了这两个注释.

在不同环境或分布式培训中应用此方法可能具有挑战性:

注意:Dataset.from_generator()的当前实现使用tf.py_func并继承相同的约束.特别是,它需要将数据集和迭代器相关的操作放在与调用Dataset.from_generator()的Python程序相同的进程中的设备上.生成器的主体不会在GraphDef中序列化,如果需要序列化模型并在不同的环境中恢复它,则不应使用此方法.

如果发电机依赖于外部状态,请小心:

注意:如果生成器依赖于可变全局变量或其他外部状态,请注意运行时可能多次调用生成器(为了支持重复数据集)以及在调用Dataset.from_generator()和生成之间的任何时间.来自发电机的第一个元素.变异全局变量或外部状态可能导致未定义的行为,我们建议您在调用Dataset.from_generator()之前显式缓存生成器中的任何外部状态.

  • 我对此有一个后续问题。单个 HDF5 文件的示例工作正常,但我无法让多示例与交错一起使用。问题是 tf.data.Dataset.from_tensor_slices(filenames) 返回 Tensor 对象的集合而不是 Python 字符串,因此生成器无法处理这个。处理这个问题的正确方法是什么? (2认同)

小智 6

我花了一段时间才弄明白,所以我想我应该把它记录在这里。根据 mikkola 的回答,这是处理多个文件的方法:

import h5py
import tensorflow as tf

class generator:
    def __call__(self, file):
        with h5py.File(file, 'r') as hf:
            for im in hf["train_img"]:
                yield im

ds = tf.data.Dataset.from_tensor_slices(filenames)
ds = ds.interleave(lambda filename: tf.data.Dataset.from_generator(
        generator(), 
        tf.uint8, 
        tf.TensorShape([427,561,3]),
        args=(filename,)),
       cycle_length, block_length)
Run Code Online (Sandbox Code Playgroud)

关键是你不能filename直接传递给generator,因为它是一个Tensor. 你必须通过它args,tensorflow 评估并将其转换为常规的 python 变量。