如何提高数据输入管道的性能?

Ale*_*NON 23 python python-3.x tensorflow tensorflow-datasets tensorflow2.0

我尝试优化我的数据输入管道。该数据集是一组 450 个 TFRecord 文件,每个文件大小约为 70MB,托管在 GCS 上。该作业使用 GCP ML Engine 执行。没有 GPU。

这是管道:

def build_dataset(file_pattern):
    return tf.data.Dataset.list_files(
        file_pattern
    ).interleave(
        tf.data.TFRecordDataset,
        num_parallel_calls=tf.data.experimental.AUTOTUNE
    ).shuffle(
        buffer_size=2048
    ).batch(
        batch_size=2048,
        drop_remainder=True,
    ).cache(
    ).repeat(
    ).map(
        map_func=_parse_example_batch,
        num_parallel_calls=tf.data.experimental.AUTOTUNE
    ).prefetch(
        buffer_size=1
    )
Run Code Online (Sandbox Code Playgroud)

使用映射函数:

def _bit_to_float(string_batch: tf.Tensor):
    return tf.reshape(tf.math.floormod(tf.dtypes.cast(tf.bitwise.right_shift(
        tf.expand_dims(tf.io.decode_raw(string_batch, tf.uint8), 2),
        tf.reshape(tf.dtypes.cast(tf.range(7, -1, -1), tf.uint8), (1, 1, 8))
    ), tf.float32), 2), (tf.shape(string_batch)[0], -1))


def _parse_example_batch(example_batch):
    preprocessed_sample_columns = {
        "features": tf.io.VarLenFeature(tf.float32),
        "booleanFeatures": tf.io.FixedLenFeature((), tf.string, ""),
        "label": tf.io.FixedLenFeature((), tf.float32, -1)
    }
    samples = tf.io.parse_example(example_batch, preprocessed_sample_columns)
    dense_float = tf.sparse.to_dense(samples["features"])
    bits_to_float = _bit_to_float(samples["booleanFeatures"])
    return (
        tf.concat([dense_float, bits_to_float], 1),
        tf.reshape(samples["label"], (-1, 1))
    )
Run Code Online (Sandbox Code Playgroud)

我尝试遵循数据管道教程的最佳实践,并对我的映射函数进行矢量化(按照mrry 的建议)。

使用此设置,当数据以高速下载(带宽约为 200MB/s)时,CPU 使用率不足(14%)并且训练非常缓慢(一个 epoch 超过 1 小时)。

我尝试了一些参数配置,更改了interleave()诸如或 之类的参数num_parallel_calls或诸如.cycle_lengthTFRecordDatasetnum_parallel_calls

最快的配置使用这组参数:

  • interleave.num_parallel_calls: 1
  • interleave.cycle_length: 8
  • TFRecordDataset.num_parallel_calls: 8

有了这个,一个 epoch 只需大约 20 分钟即可运行。但是,CPU 使用率只有 50%,而带宽消耗约为 55MB/s

问题:

  1. 如何优化管道以达到 100% 的 CPU 使用率(以及 100MB/s 的带宽消耗)?
  2. 为什么tf.data.experimental.AUTOTUNE没有找到加速训练的最佳值?

亲切的,亚历克西斯。


编辑

经过一些更多的实验,我得出了以下解决方案。

  1. 删除if大于0interleave已经处理的步骤。TFRecordDatasetnum_parallel_calls
  2. 将映射函数更新为仅执行parse_exampleand decode_raw,返回元组 `((, ), ())
  3. cache 之后 map
  4. _bit_to_float函数作为模型的组件移动

最后,这里是数据管道代码:

def build_dataset(file_pattern):
    return tf.data.TFRecordDataset(
        tf.data.Dataset.list_files(file_pattern),
        num_parallel_reads=multiprocessing.cpu_count(),
        buffer_size=70*1000*1000
    ).shuffle(
        buffer_size=2048
    ).map(
        map_func=split,
        num_parallel_calls=tf.data.experimental.AUTOTUNE
    ).batch(
        batch_size=2048,
        drop_remainder=True,
    ).cache(
    ).repeat(
    ).prefetch(
        buffer_size=32
    )


def split(example):
    preprocessed_sample_columns = {
        "features": tf.io.VarLenFeature(tf.float32),
        "booleanFeatures": tf.io.FixedLenFeature((), tf.string, ""),
        "label": tf.io.FixedLenFeature((), tf.float32, -1)
    }
    samples = tf.io.parse_single_example(example, preprocessed_sample_columns)
    dense_float = tf.sparse.to_dense(samples["features"])
    bits_to_float = tf.io.decode_raw(samples["booleanFeatures"], tf.uint8)
    return (
        (dense_float, bits_to_float),
        tf.reshape(samples["label"], (1,))
    )


def build_model(input_shape):
    feature = keras.Input(shape=(N,))
    bool_feature = keras.Input(shape=(M,), dtype="uint8")
    one_hot = dataset._bit_to_float(bool_feature)
    dense_input = tf.reshape(
        keras.backend.concatenate([feature, one_hot], 1),
        input_shape)
    output = actual_model(dense_input)

    model = keras.Model([feature, bool_feature], output)
    return model

def _bit_to_float(string_batch: tf.Tensor):
    return tf.dtypes.cast(tf.reshape(
        tf.bitwise.bitwise_and(
            tf.bitwise.right_shift(
                tf.expand_dims(string_batch, 2),
                tf.reshape(
                    tf.dtypes.cast(tf.range(7, -1, -1), tf.uint8),
                    (1, 1, 8)
                ),
            ),
            tf.constant(0x01, dtype=tf.uint8)
        ),
        (tf.shape(string_batch)[0], -1)
    ), tf.float32)
Run Code Online (Sandbox Code Playgroud)

感谢所有这些优化:

  • 带宽消耗在90MB/s左右
  • CPU使用率在20%左右
  • 第一个 epoch 花费 20 分钟
  • 连续的 epoch 每个花费 5 分钟

所以这似乎是一个很好的第一次设置。但是 CPU 和 BW 仍然没有被过度使用,所以仍然欢迎任何建议!


编辑之二

因此,经过一些基准测试后,我发现了我认为最好的输入管道:

def build_dataset(file_pattern):
    tf.data.Dataset.list_files(
        file_pattern
    ).interleave(
        TFRecordDataset,
        cycle_length=tf.data.experimental.AUTOTUNE,
        num_parallel_calls=tf.data.experimental.AUTOTUNE
    ).shuffle(
        2048
    ).batch(
        batch_size=64,
        drop_remainder=True,
    ).map(
        map_func=parse_examples_batch,
        num_parallel_calls=tf.data.experimental.AUTOTUNE
    ).cache(
    ).prefetch(
        tf.data.experimental.AUTOTUNE
    )

def parse_examples_batch(examples):
    preprocessed_sample_columns = {
        "features": tf.io.FixedLenSequenceFeature((), tf.float32, allow_missing=True),
        "booleanFeatures": tf.io.FixedLenFeature((), tf.string, ""),
        "label": tf.io.FixedLenFeature((), tf.float32, -1)
    }
    samples = tf.io.parse_example(examples, preprocessed_sample_columns)
    bits_to_float = tf.io.decode_raw(samples["booleanFeatures"], tf.uint8)
    return (
        (samples['features'], bits_to_float),
        tf.expand_dims(samples["label"], 1)
    )
Run Code Online (Sandbox Code Playgroud)

那么,什么是新的:

  • 根据this GitHub issueTFRecordDataset交错是遗留的,因此interleave功能更好。
  • batchbeforemap是一个好习惯(向量化您的函数)并减少调用映射函数的次数。
  • 不需要repeat了。从 TF2.0 开始,Keras 模型 API 支持数据集 API 并且可以使用缓存(参见SO 帖子
  • 从 a 切换VarLenFeature到 a FixedLenSequenceFeature,删除对 的无用调用tf.sparse.to_dense

希望这能有所帮助。仍然欢迎提供建议。

Ten*_*ort 13

为了社区的利益,在回答部分提及@AlexisBRENON 的解决方案和重要意见。

下面提到的是重要的观察结果:

  1. 根据this GitHub issue,这TFRecordDataset interleaving是一个遗留问题,因此interleave功能更好。
  2. batchbeforemap是一个好习惯(向量化您的函数)并减少调用映射函数的次数。
  3. 不需要repeat了。从 TF2.0 开始,Keras 模型 API 支持数据集 API 并且可以使用缓存(参见SO 帖子
  4. 从 a 切换VarLenFeature到 a FixedLenSequenceFeature,删除对 的无用调用tf.sparse.to_dense

下面提到了流水线的代码,具有改进的性能,符合上述观察:

def build_dataset(file_pattern):
    tf.data.Dataset.list_files(
        file_pattern
    ).interleave(
        TFRecordDataset,
        cycle_length=tf.data.experimental.AUTOTUNE,
        num_parallel_calls=tf.data.experimental.AUTOTUNE
    ).shuffle(
        2048
    ).batch(
        batch_size=64,
        drop_remainder=True,
    ).map(
        map_func=parse_examples_batch,
        num_parallel_calls=tf.data.experimental.AUTOTUNE
    ).cache(
    ).prefetch(
        tf.data.experimental.AUTOTUNE
    )

def parse_examples_batch(examples):
    preprocessed_sample_columns = {
        "features": tf.io.FixedLenSequenceFeature((), tf.float32, allow_missing=True),
        "booleanFeatures": tf.io.FixedLenFeature((), tf.string, ""),
        "label": tf.io.FixedLenFeature((), tf.float32, -1)
    }
    samples = tf.io.parse_example(examples, preprocessed_sample_columns)
    bits_to_float = tf.io.decode_raw(samples["booleanFeatures"], tf.uint8)
    return (
        (samples['features'], bits_to_float),
        tf.expand_dims(samples["label"], 1)
    )
Run Code Online (Sandbox Code Playgroud)