我的训练过程使用tfrecord格式表示训练和评估数据集.
我测试了阅读器的基准,只有8000记录/秒.和io速度(参见iotop命令)只需400KB-500KB/s.
我在这里使用protobuf的cpp版本
如果可能的话,提供一个可重复性最小的示例(我们通常没有时间阅读数百行代码)
def read_and_decode(filename_queue):
reader = tf.TFRecordReader()
_, serialized_example = reader.read(filename_queue)
return serialized_example
serialized_example = read_and_decode(filename_queue)
batch_serialized_example = tf.train.shuffle_batch(
[serialized_example],
batch_size=batch_size,
num_threads=thread_number,
capacity=capacity,
min_after_dequeue=min_after_dequeue)
features = tf.parse_example(
batch_serialized_example,
features={
"label": tf.FixedLenFeature([], tf.float32),
"ids": tf.VarLenFeature(tf.int64),
"values": tf.VarLenFeature(tf.float32),
})
Run Code Online (Sandbox Code Playgroud)
您尝试了哪些其他尝试的解决方案?
我尝试在tf.train.shuffle_batch中设置num_threads但不起作用.
似乎当设置为2个线程时,它工作在8000records/s,当放大线程数时,它会变慢.(我删除所有花费cpus的操作.只需读取数据.)
我的服务器是24核心cpu.
这里的问题是每个都有一个固定的成本开销session.run,并且用队列中的许多小例子填充队列将会很慢.
特别是,每个session.run大约100-200 usec,所以你session.run每秒只能进行大约5k-10k的调用.
如果进行Python分析(python -m cProfile),这个问题很明显,但很难看出是从时间轴配置文件还是CPU配置文件开始.
解决方法是使用enqueue_many批量添加到队列中的东西.我从https://gist.github.com/ericyue/7705407a88e643f7ab380c6658f641e8获取了你的基准测试并对其进行了修改,以便每次.run调用将多个项目排入队列,并且可以提高10倍速度.
修改是修改tf.batch调用如下:
if enqueue_many:
reader = tf.TFRecordReader(options = tf.python_io.TFRecordOptions(tf.python_io.TFRecordCompressionType.ZLIB))
queue_batch = []
for i in range(enqueue_many_size):
_, serialized_example = reader.read(filename_queue)
queue_batch.append(serialized_example)
batch_serialized_example = tf.train.shuffle_batch(
[queue_batch],
batch_size=batch_size,
num_threads=thread_number,
capacity=capacity,
min_after_dequeue=min_after_dequeue,
enqueue_many=True)
Run Code Online (Sandbox Code Playgroud)
如需完整信息,请访问:https: //github.com/yaroslavvb/stuff/blob/master/ericyue-slowreader/benchmark.py
由于现在大部分时间花在队列操作上,因此很难将其优化得更快.看一下刚刚将整数添加到队列中的精简版本,你也可以获得类似的速度,并且考虑时间线,时间花费在出列操作上.
每个出列操作大约需要60个usec,但平均有5个并行运行,因此每个出列时可获得12个usec.这意味着在最好的情况下,你每秒会得到<200k的例子.
这是Yaroslav答案的简单加速建筑:
Tensorflow有一个内置函数tf.TFRecordReader.read_up_to,它读取每个session.run()调用中的多个记录,从而消除多个调用引起的多余开销.
enqueue_many_size = SOME_ENQUEUE_MANY_SIZE
reader = tf.TFRecordReader(options = tf.python_io.TFRecordOptions(tf.python_io.TFRecordCompressionType.ZLIB))
_, queue_batch = reader.read_up_to(filename_queue, enqueue_many_size)
batch_serialized_example = tf.train.shuffle_batch(
[queue_batch],
batch_size=batch_size,
num_threads=thread_number,
capacity=capacity,
min_after_dequeue=min_after_dequeue,
enqueue_many=True)
Run Code Online (Sandbox Code Playgroud)
与Yaroslav的答案一样,您需要进行设置,enqueue_many=True以便批处理函数知道它正在接受多个记录.
这在我的用例中非常快.
| 归档时间: |
|
| 查看次数: |
2662 次 |
| 最近记录: |