为了在Tensorflow中训练LSTM模型,我将数据结构化为tf.train.SequenceExample格式并将其存储到TFRecord文件中.我现在想使用新的DataSet API来生成用于训练的填充批次.在文档中有一个使用padded_batch的例子,但对于我的数据,我无法弄清楚padded_shapes应该是什么值.
为了将TFrecord文件读入批处理,我编写了以下Python代码:
import math
import tensorflow as tf
import numpy as np
import struct
import sys
import array
if(len(sys.argv) != 2):
print "Usage: createbatches.py [RFRecord file]"
sys.exit(0)
vectorSize = 40
inFile = sys.argv[1]
def parse_function_dataset(example_proto):
sequence_features = {
'inputs': tf.FixedLenSequenceFeature(shape=[vectorSize],
dtype=tf.float32),
'labels': tf.FixedLenSequenceFeature(shape=[],
dtype=tf.int64)}
_, sequence = tf.parse_single_sequence_example(example_proto, sequence_features=sequence_features)
length = tf.shape(sequence['inputs'])[0]
return sequence['inputs'], sequence['labels']
sess = tf.InteractiveSession()
filenames = tf.placeholder(tf.string, shape=[None])
dataset = tf.contrib.data.TFRecordDataset(filenames) …Run Code Online (Sandbox Code Playgroud) 所以,我一直在玩TensorFlow数据集API来加载图像和分段掩码(用于语义分割项目),我希望能够生成批量的图像和掩码,每个图像随机经过任何一个预处理功能的组合,如亮度变化,对比度变化,裁剪,饱和度变化等.因此,我的批次中的第一个图像可能没有预处理,第二个可能有饱和度变化,第三个可能有亮度和饱和度等.
我尝试了以下方法:
import tensorflow as tf
from tensorflow.contrib.data import Dataset, Iterator
import random
def _resize_image(image, mask):
image = tf.image.resize_bicubic(image, [480, 640], True)
mask = tf.image.resize_bicubic(mask, [480, 640], True)
return image, mask
def _corrupt_contrast(image, mask):
image = tf.image.random_contrast(image, 0, 5)
return image, mask
def _corrupt_saturation(image, mask):
image = tf.image.random_saturation(image, 0, 5)
return image, mask
def _corrupt_brightness(image, mask):
image = tf.image.random_brightness(image, 5)
return image, mask
def _random_crop(image, mask):
seed = random.random()
image = tf.random_crop(image, [240, 320, 3], seed=seed)
mask = tf.random_crop(mask, [240, …Run Code Online (Sandbox Code Playgroud) 我有一个tensorflow模型,我正在google-colab上训练.实际模型更复杂,但我将其浓缩为可重复的示例(删除了保存/恢复,学习速率衰减,断言,张量事件,渐变剪切等).该模型合理地工作(收敛到可接受的损失),我正在寻找一种加速训练的方法(每秒迭代次数).
目前在colab的GPU上,需要10分钟来训练1000次迭代.我目前的批量大小为512,这意味着该模型每秒处理约850个示例(我希望批量大小为512,除非其他大小提供合理的加速.本身改变批量大小不会改变速度).
所以目前我有一个以tfrecord格式存储的数据:这是一个500Mb的示例文件,总数据大小约为0.5Tb.这些数据经过了一个相当繁重的预处理步骤(我不能事先进行预处理,因为它会增加我的tfrecords的大小,超出我能承受的范围).预处理通过tf.data完成,输出张量((batch_size, 8, 8, 24)被视为NHWC (batch_size, 10))被传递到模型中.示例colab不包含简化模型,仅作为示例.
我尝试了一些方法来加速训练:
dataset.prefetch(...)num_parallel_calls到地图tf.contrib.data.map_and_batchparallel_interleave与数据预处理相关的代码在这里(这是一个完整的可重复示例,带有示例数据):
_keys_to_map = {
'd': tf.FixedLenFeature([], tf.string), # data
's': tf.FixedLenFeature([], tf.int64), # score
}
def _parser(record):][3]
parsed = tf.parse_single_example(record, _keys_to_map)
return parsed['d'], parsed['s']
def init_tfrecord_dataset():
files_train = glob.glob(DIR_TFRECORDS + '*.tfrecord')
random.shuffle(files_train)
with …Run Code Online (Sandbox Code Playgroud) 似乎MonitoredTrainingSession在第一次调用.run(..)之前做了一些操作(logging?),这意味着当我这样做时:
train_data = reader.traindata() # returns a tf.contrib.data.Dataset
it = tf.contrib.data.Iterator.from_structure(train_data.output_types, train_data.output_shapes)
init_train = it.make_initializer(train_data)
ne = it.get_next()
ts = tf.train.MonitoredTrainingSession(checkpoint_dir=save_path)
... no calls to ts.run ...
ts.run(init_train)
Run Code Online (Sandbox Code Playgroud)
这会产生错误:
FailedPreconditionError (see above for traceback): GetNext() failed because the iterator has not been initialized. Ensure that you have run the initializer operation for this iterator before getting the next element
Run Code Online (Sandbox Code Playgroud)
因此,在运行我提供的操作之前,它就像MonitoredTrainingSession正在执行某些操作一样接缝,从而无法使用来自Dataset的可重新初始化的迭代器进行togeather.
我相信我错过了一些东西,并希望听到:-)
我们在Tensorflow上运行多GPU作业,并评估从基于队列的模型(使用string_input_producer接口)到新的Tensorflow Dataset API的迁移.后者似乎提供了一种更容易的方式来同时切换列车和验证.
下面的代码片段显示了我们如何做到这一点.
train_dataset, train_iterator = get_dataset(train_files, batch_size, epochs)
val_dataset, val_iterator = get_dataset(val_files, batch_size, epochs)
is_validating = tf.placeholder(dtype=bool, shape=())
next_batch = tf.cond(is_validating,
lambda: val_iterator.get_next(),
lambda: train_iterator.get_next())
validation_tower = self.num_gpus - 1
tower_grads = []
for i in range(self.num_gpus):
with tf.variable_scope(tf.get_variable_scope(),reuse=(i > 0)):
with tf.device('/gpu:%d' % i), tf.name_scope('%s_%d' % ('gpu_', i)) as scope:
if i == validation_tower:
images, labels = next_batch
# Loss funcs snipped out
else:
images, labels = next_batch
# Loss funcs snipped out
Run Code Online (Sandbox Code Playgroud)
get_dataset函数构建数据集,设置映射函数和批处理大小.它还构建了一个迭代器,但没有初始化它.迭代器的初始化发生在会话开始之前.
会话运行时提供is_validating布尔值,我们通过feed_dict传递的每个步骤is_validating为True,以使用验证数据集
我的问题是:
假设我有8个gpus,所以我们对7个GPU进行了培训.对于这7个GPU中的每一个,Iterator是否从同一点前进,从而为所有7个GPU提供相同的数据?
我正在尝试使用TF的新功能,即Data API,我不确定prefetch的工作原理.在下面的代码中
def dataset_input_fn(...)
dataset = tf.data.TFRecordDataset(filenames, compression_type="ZLIB")
dataset = dataset.map(lambda x:parser(...))
dataset = dataset.map(lambda x,y: image_augmentation(...)
, num_parallel_calls=num_threads
)
dataset = dataset.shuffle(buffer_size)
dataset = dataset.batch(batch_size)
dataset = dataset.repeat(num_epochs)
iterator = dataset.make_one_shot_iterator()
Run Code Online (Sandbox Code Playgroud)
在我上面的每一行之间有关系dataset=dataset.prefetch(batch_size)吗?或者也许它应该是在output_buffer_size数据集来自何时将要使用的每个操作之后tf.contrib.data?
假设我以这种方式定义了一个数据集:
filename_dataset = tf.data.Dataset.list_files("{}/*.png".format(dataset))
Run Code Online (Sandbox Code Playgroud)
如何获取数据集中的元素数量(因此,构成一个纪元的单个元素的数量)?
我知道tf.data.Dataset已经知道数据集的维度,因为该repeat()方法允许在指定的时期内重复输入管道。因此,它必须是获取此信息的一种方法。
我已经看到了很多关于使用LSTM进行张量流时间序列的指南,但我仍然不确定当前读取和处理数据的最佳实践 - 特别是当人们应该使用tf.data.DatasetAPI时.
在我的情况下,我有一个文件data.csv与我features,并希望做以下两个任务:
计算目标 - 目标时间t是某个范围内某些列的百分比变化,即
labels[i] = features[i + h, -1] / features[i, -1] - 1
Run Code Online (Sandbox Code Playgroud)
我想h在这里成为一个参数,所以我可以尝试不同的视野.
滚动窗口 - 出于培训目的,我需要将我的功能滚动到长度为的窗口window:
train_features[i] = features[i: i + window]
Run Code Online (Sandbox Code Playgroud)我很乐意使用pandas或构建这些对象numpy,所以我不会问如何实现这一点 - 我的问题是具体应该是什么样的管道tensorflow.
编辑:我想我也想知道我列出的2个任务是否适合数据集api,或者我最好使用其他库来处理它们?
在张量流Dataset管道中,我想定义一个自定义映射函数,它接受一个输入元素(数据样本)并返回多个元素(数据样本).
下面的代码是我的尝试,以及期望的结果.
我不能很好地遵循文档,tf.data.Dataset().flat_map()以了解它是否适用于此处.
import tensorflow as tf
input = [10, 20, 30]
def my_map_func(i):
return [[i, i+1, i+2]] # Fyi [[i], [i+1], [i+2]] throws an exception
ds = tf.data.Dataset.from_tensor_slices(input)
ds = ds.map(map_func=lambda input: tf.py_func(
func=my_map_func, inp=[input], Tout=[tf.int64]
))
element = ds.make_one_shot_iterator().get_next()
with tf.Session() as sess:
for _ in range(9):
print(sess.run(element))
Run Code Online (Sandbox Code Playgroud)
结果:
(array([10, 11, 12]),)
(array([20, 21, 22]),)
(array([30, 31, 32]),)
Run Code Online (Sandbox Code Playgroud)
期望的结果:
(10)
(11)
(12)
(20)
(21)
(22)
(30)
(31)
(32)
Run Code Online (Sandbox Code Playgroud) 我正在用 tf.data.Dataset 预处理的 CSV 文件中的数据训练我的 Tensorflow 模型。但是,我希望模型分叉为对应于一组不同 csv 列的三个分支,并且 model.fit 需要为每个输出提供一个单独的数据集。CSV 文件的所有列都需要进行相同的预处理,因此准备它的最有效方法是加载整个文件,对其进行处理,然后将数据集拆分为三个部分。但是,我正在努力寻找这样做的方法。
我希望 dataset.map 允许我使用以下操作选择一些列:
dset = dset.map(lambda x: x[[1, 2, 3, 7]])
Run Code Online (Sandbox Code Playgroud)
但似乎 tensorflow 将其解释为x[1][2][3][7]相反。
我发现创建单独数据集的唯一可行方法是从头开始:
y = []
for cls, keys in output_classes.items():
tmp = tf.data.experimental.CsvDataset(data_path, [tf.int32 for i in keys], select_cols=keys)
[...]
y.append(tmp)
y = tf.data.Dataset.zip(tuple(y))
Run Code Online (Sandbox Code Playgroud)
不幸的是,它会产生大量不必要的开销并极大地减慢训练速度。
有没有办法按功能子集拆分 tf.data.Dataset 对象?