将大型培训和测试文件流式传输到Tensorflow的DNNClassifier中

aec*_*aec 10 python csv tensorflow

我有一个巨大的训练CSV文件(709M)和一个大型测试CSV文件(125M),我想DNNClassifier在使用高级Tensorflow API的上下文中发送到该文件.

似乎input_fnparam接受fit并且evaluate必须在内存中保存所有功能和标签数据,但我现在想在我的本地计算机上运行它,因此如果我将这些文件读入内存并且因此它会很快耗尽内存然后处理它们.

我浏览了关于数据流式读取的文档,但是用于读取CSV的示例代码似乎是针对低级别的Tensorflow API.

并且 - 如果你原谅了一些抱怨 - 对于将训练有素的训练和测试数据文件发送到Estimator...... 的微不足道的用例来说似乎过于复杂......尽管,培训实际上可能需要这种复杂程度并在Tensorflow中测试大量数据?

在任何情况下,我都非常感谢使用高级API的方法,如果它甚至可能,我开始怀疑.

在探索之后,我确实找到了DNNClassifier#partial_fit,并试图用它进行训练.

如何使用这种方法的例子可以节省一些时间,但希望我会在接下来的几个小时内偶然发现正确的用法.

然而,似乎没有相应的DNNClassifier#partial_evaluate...虽然我怀疑我可以将测试数据分解成更小的部分并DNNClassifier#evaluate在每个批次上连续运行,这实际上可能是一个很好的方法,因为我可以分段将测试数据分组到队列中,从而获得每队列的准确性.

====更新====

精简版:

  1. DomJack的建议应该是公认的答案.

  2. 但是,我的Mac的16GB RAM足以将整个709Mb的训练数据集保存在内存中而不会崩溃.因此,虽然在最终部署应用程序时我将使用DataSet功能,但我还没有将它用于本地开发工作.

更长的版本:

我开始使用partial_fit如上所述的API,但在每次使用时都会发出警告.

所以,我去看看这里的方法的源代码,并发现它的完整实现如下所示:

logging.warning('The current implementation of partial_fit is not optimized'
                ' for use in a loop. Consider using fit() instead.')
return self.fit(x=x, y=y, input_fn=input_fn, steps=steps,
                batch_size=batch_size, monitors=monitors)
Run Code Online (Sandbox Code Playgroud)

......让我想起了Hitchhiker's Guide的这个场景:

Arthur Dent:如果按此按钮会怎样?

福特Prefect:我不会 -

Arthur Dent:哦.

福特知府:发生了什么?

Arthur Dent:一个标志亮了起来,说'请不要再按此按钮'.

也就是说:partial_fit似乎存在的唯一目的是告诉你不要使用它.

此外,通过partial_fit迭代地使用训练文件块生成的模型比通过fit在整个训练文件上使用生成的模型小得多,这强烈暗示只有最后一个partial_fit训练块实际上"占用".

Dom*_*ack 19

查看tf.data.DatasetAPI.有许多方法可以创建数据集.我将概述三个 - 但你只需要实现一个.

我假设你的csv文件的每一行都是n_features浮点值后跟一个int值.

创建一个 tf.data.Dataset

用一个python生成器包装 Dataset.from_generator

最简单的方法是包装一个原生的python生成器.我鼓励你先尝试这个,只有在你看到严重的性能问题时才会改变.

def read_csv(filename):
    with open(filename, 'r') as f:
        for line in f.readlines():
            record = line.rstrip().split(',')
            features = [float(n) for n in record[:-1]]
            label = int(record[-1])
            yield features, label

def get_dataset():
    filename = 'my_train_dataset.csv'
    generator = lambda: read_csv(filename)
    return tf.data.Dataset.from_generator(
        generator, (tf.float32, tf.int32), ((n_features,), ()))
Run Code Online (Sandbox Code Playgroud)

这种方法非常通用,允许您read_csv独立于TensorFlow 测试您的发电机功能().

包装基于索引的python函数

上面的一个缺点是使用大小的shuffle缓冲区对结果数据集进行混洗,info需要builder加载示例.这将在您的管道中创建周期性暂停(大n)或导致可能较差的混乱(小n).

class MyCsvDatasetBuilder(tfds.core.GeneratorBasedBuilder):
  VERSION = tfds.core.Version("0.0.1")

  def _info(self):
    return tfds.core.DatasetInfo(
        builder=self,
        description=(
            "My dataset"),
        features=tfds.features.FeaturesDict({
            "features": tfds.features.Tensor(
              shape=(FEATURE_SIZE,), dtype=tf.float32),
            "label": tfds.features.ClassLabel(
                names=CLASS_NAMES),
            "index": tfds.features.Tensor(shape=(), dtype=tf.float32)
        }),
        supervised_keys=("features", "label"),
    )

  def _split_generators(self, dl_manager):
    paths = dict(
      train='/path/to/train.csv',
      test='/path/to/test.csv',
    )
    # better yet, if the csv files were originally downloaded, use
    # urls = dict(train=train_url, test=test_url)
    # paths = dl_manager.download(urls)
    return [
        tfds.core.SplitGenerator(
            name=tfds.Split.TRAIN,
            num_shards=10,
            gen_kwargs=dict(path=paths['train'])),
        tfds.core.SplitGenerator(
            name=tfds.Split.TEST,
            num_shards=2,
            gen_kwargs=dict(cvs_path=paths['test']))
    ]

  def _generate_examples(self, csv_path):
    with open(csv_path, 'r') as f:
        for i, line in enumerate(f.readlines()):
            record = line.rstrip().split(',')
            features = [float(n) for n in record[:-1]]
            label = int(record[-1])
            yield dict(features=features, label=label, index=i)
Run Code Online (Sandbox Code Playgroud)

简而言之,我们只创建一个记录索引的数据集(或任何我们可以完全加载到内存中的小记录ID).然后我们对这个最小数据集进行改组/重复操作,然后n通过n和执行实际数据的索引map.见tf.data.Dataset.maptf.py_func下面的用法部分.请注意,这需要您的数据可以按行访问,因此您可能需要转换Using with Estimators为其他格式.

TextLineDataset

您也可以Testing in isolation使用a直接读取文件csv.

builder = MyCsvDatasetBuilder()
builder.download_and_prepare()  # will only take time to run first time
# as_supervised makes output (features, label) - good for model.fit
datasets = builder.as_dataset(as_supervised=True)

train_ds = datasets['train']
test_ds = datasets['test']
Run Code Online (Sandbox Code Playgroud)

csv由于tf.data.TextLineDataset需要批量处理,因此该功能有点复杂.如果在解析之前批量处理数据集,则可以使其稍微简单一些.

def get_record(i):
    # load the ith record using standard python, return numpy arrays
    return features, labels

def get_inputs(batch_size, is_training):

    def tf_map_fn(index):
        features, labels = tf.py_func(
            get_record, (index,), (tf.float32, tf.int32), stateful=False)
        features.set_shape((n_features,))
        labels.set_shape(())
        # do data augmentation here
        return features, labels

    epoch_size = get_epoch_size()
    dataset = tf.data.Dataset.from_tensor_slices((tf.range(epoch_size,))
    if is_training:
        dataset = dataset.repeat().shuffle(epoch_size)
    dataset = dataset.map(tf_map_fn, (tf.float32, tf.int32), num_parallel_calls=8)
    dataset = dataset.batch(batch_size)
    # prefetch data to CPU while GPU processes previous batch
    dataset = dataset.prefetch(1)
    # Also possible
    # dataset = dataset.apply(
    #     tf.contrib.data.prefetch_to_device('/gpu:0'))
    features, labels = dataset.make_one_shot_iterator().get_next()
    return features, labels
Run Code Online (Sandbox Code Playgroud)

TFRecordDataset

或者,您可以将parse_row文件转换为TFRecord文件并使用TFRecordDataset.有一个彻底的教程在这里.

第1步:将tf.decode_csv数据转换为TFRecords数据.实施例下面的代码(参见csvcsv上面的例子).

def get_record_defaults():
  zf = tf.zeros(shape=(1,), dtype=tf.float32)
  zi = tf.ones(shape=(1,), dtype=tf.int32)
  return [zf]*n_features + [zi]

def parse_row(tf_string):
    data = tf.decode_csv(
        tf.expand_dims(tf_string, axis=0), get_record_defaults())
    features = data[:-1]
    features = tf.stack(features, axis=-1)
    label = data[-1]
    features = tf.squeeze(features, axis=0)
    label = tf.squeeze(label, axis=0)
    return features, label

def get_dataset():
    dataset = tf.data.TextLineDataset(['data.csv'])
    return dataset.map(parse_row, num_parallel_calls=8)
Run Code Online (Sandbox Code Playgroud)

这只需要运行一次.

第2步:编写解码这些记录文件的数据集.

def parse_batch(tf_string):
    data = tf.decode_csv(tf_string, get_record_defaults())
    features = data[:-1]
    labels = data[-1]
    features = tf.stack(features, axis=-1)
    return features, labels

def get_batched_dataset(batch_size):
    dataset = tf.data.TextLineDataset(['data.csv'])
    dataset = dataset.batch(batch_size)
    dataset = dataset.map(parse_batch)
    return dataset
Run Code Online (Sandbox Code Playgroud)

将数据集与估算器一起使用

with tf.python_io.TFRecordWriter("my_train_dataset.tfrecords") as writer:
    for features, labels in read_csv('my_train_dataset.csv'):
        example = tf.train.Example()
        example.features.feature[
            "features"].float_list.value.extend(features)
        example.features.feature[
            "label"].int64_list.value.append(label)
        writer.write(example.SerializeToString())
Run Code Online (Sandbox Code Playgroud)

单独测试数据集

我强烈建议您独立于估算器测试数据集.使用上面的内容read_csv,应该就这么简单

def parse_function(example_proto):
    features = {
        'features': tf.FixedLenFeature((n_features,), tf.float32),
        'label': tf.FixedLenFeature((), tf.int64)
    }
    parsed_features = tf.parse_single_example(example_proto, features)
    return parsed_features['features'], parsed_features['label']

def get_dataset():
    dataset = tf.data.TFRecordDataset(['data.tfrecords'])
    dataset = dataset.map(parse_function)
    return dataset
Run Code Online (Sandbox Code Playgroud)

性能

假设您使用GPU来运行网络,除非from_generator文件的每一行都很大并且您的网络很小,否则您可能不会注意到性能上的差异.这是因为get_inputs实现强制在CPU上执行数据加载/预处理,并且csv意味着可以在CPU上准备下一批,因为当前批处理正在GPU上进行训练.唯一的例外是,如果在每个记录上有大量数据的数据集上有大量的随机大小,那么在通过GPU运行任何内容之前,最初需要花费一些时间来加载大量示例.