如何为我的tensorflow模型提高此数据管道的性能

Sal*_*ali 13 python tensorflow tensorflow-datasets

我有一个tensorflow模型,我正在google-colab上训练.实际模型更复杂,但我将其浓缩为可重复的示例(删除了保存/恢复,学习速率衰减,断言,张量事件,渐变剪切等).该模型合理地工作(收敛到可接受的损失),我正在寻找一种加速训练的方法(每秒迭代次数).

目前在colab的GPU上,需要10分钟来训练1000次迭代.我目前的批量大小为512,这意味着该模型每秒处理约850个示例(我希望批量大小为512,除非其他大小提供合理的加速.本身改变批量大小不会改变速度).


所以目前我有一个以tfrecord格式存储的数据:这是一个500Mb的示例文件,总数据大小约为0.5Tb.这些数据经过了一个相当繁重的预处理步骤(我不能事先进行预处理,因为它会增加我的tfrecords的大小,超出我能承受的范围).预处理通过tf.data完成,输出张量((batch_size, 8, 8, 24)被视为NHWC (batch_size, 10))被传递到模型中.示例colab不包含简化模型,仅作为示例.


我尝试了一些方法来加速训练:

  • 手动设备放置(cpu上的数据预处理,gpu上的传播),但我的所有尝试都导致速度更慢(从10%增加到50%).
  • 改善数据预处理.我查看了tf.data视频数据教程.我尝试了几乎所有教程中的技术都没有改进(速度从0%降低到15%).特别是我试过:
    • dataset.prefetch(...)
    • 传递num_parallel_calls到地图
    • 结合地图和批次 tf.contrib.data.map_and_batch
    • 运用 parallel_interleave

与数据预处理相关的代码在这里(这是一个完整的可重复示例,带有示例数据):

_keys_to_map = {
    'd': tf.FixedLenFeature([], tf.string),  # data
    's': tf.FixedLenFeature([], tf.int64),   # score
}


def _parser(record):][3]
    parsed = tf.parse_single_example(record, _keys_to_map)
    return parsed['d'], parsed['s']


def init_tfrecord_dataset():
  files_train = glob.glob(DIR_TFRECORDS + '*.tfrecord')
  random.shuffle(files_train)

  with tf.name_scope('tfr_iterator'):
    ds = tf.data.TFRecordDataset(files_train)      # define data from randomly ordered files
    ds = ds.shuffle(buffer_size=10000)             # select elements randomly from the buffer
    ds = ds.map(_parser)                           # map them based on tfrecord format
    ds = ds.batch(BATCH_SIZE, drop_remainder=True) # group elements in batch (remove batch of less than BATCH_SIZE)
    ds = ds.repeat()                               # iterate infinitely 

    return ds.make_initializable_iterator()        # initialize the iterator


def iterator_to_data(iterator):
  """Creates a part of the graph which reads the raw data from an iterator and transforms it to a 
  data ready to be passed to model.

  Args:
    iterator      - iterator. Created by `init_tfrecord_dataset`

  Returns:
    data_board      - (BATCH_SIZE, 8, 8, 24) you can think about as NWHC for images.
    data_flags      - (BATCH_SIZE, 10)
    combined_score  - (BATCH_SIZE,)
  """

  b = tf.constant((128, 64, 32, 16, 8, 4, 2, 1), dtype=tf.uint8, name='unpacked_const')

  with tf.name_scope('tfr_parse'):
    with tf.name_scope('packed_data'):
      next_element = iterator.get_next()
      data_packed, score_int = next_element
      score = tf.cast(score_int, tf.float64, name='score_float')

    # https://stackoverflow.com/q/45454470/1090562
    with tf.name_scope('data_unpacked'):
      data_unpacked = tf.reshape(tf.mod(tf.to_int32(tf.decode_raw(data_packed, tf.uint8)[:,:,None] // b), 2), [BATCH_SIZE, 1552], name='data_unpack')

    with tf.name_scope('score'):
      with tf.name_scope('is_mate'):
        score_is_mate = tf.cast(tf.squeeze(tf.slice(data_unpacked, [0, 1546], [BATCH_SIZE, 1])), tf.float64, name='is_mate')
      with tf.name_scope('combined'):
        combined_score = (1 - score_is_mate) * VALUE_A * tf.tanh(score / VALUE_K) + score_is_mate * tf.sign(score) * (VALUE_A + (1 - VALUE_A) / (VALUE_B - 1) * tf.reduce_max(tf.stack([tf.zeros(BATCH_SIZE, dtype=tf.float64), VALUE_B - tf.abs(score)]), axis=0))


    with tf.name_scope('board'):
      with tf.name_scope('reshape_layers'):
        data_board = tf.reshape(tf.slice(data_unpacked, [0, 0], [BATCH_SIZE, 8 * 8 * 24]), [BATCH_SIZE, 8, 8, 24], name='board_reshape')

      with tf.name_scope('combine_layers'):  
        data_board = tf.cast(tf.stack([
          data_board[:,:,:, 0],
          data_board[:,:,:, 4],
          data_board[:,:,:, 8],
          data_board[:,:,:,12],
          data_board[:,:,:,16],
          data_board[:,:,:,20],
          - data_board[:,:,:, 1],
          - data_board[:,:,:, 5],
          - data_board[:,:,:, 9],
          - data_board[:,:,:,13],
          - data_board[:,:,:,17],
          - data_board[:,:,:,21],
          data_board[:,:,:, 2],
          data_board[:,:,:, 6],
          data_board[:,:,:,10],
          data_board[:,:,:,14],
          data_board[:,:,:,18],
          data_board[:,:,:,22],
          - data_board[:,:,:, 3],
          - data_board[:,:,:, 7],
          - data_board[:,:,:,11],
          - data_board[:,:,:,15],
          - data_board[:,:,:,19],
          - data_board[:,:,:,23],
        ], axis=3), tf.float64, name='board_compact')

    with tf.name_scope('flags'):
      data_flags = tf.cast(tf.slice(data_unpacked, [0, 1536], [BATCH_SIZE, 10]), tf.float64, name='flags')

  return data_board, data_flags, combined_score
Run Code Online (Sandbox Code Playgroud)

我正在寻找实用的解决方案(我已经尝试了大量的理论思路),这可以提高培训的速度(就例子/秒而言).我不是在寻找提高模型准确性(或修改模型)的方法,因为这只是一个测试模型.

我花了很多时间试图优化它(并放弃).所以我很乐意为一个有效解释的工作解决方案奖励200.

mrr*_*rry 9

hampi建议你的培训工作是一个很好的建议,可能有必要了解你的管道中的实际瓶颈.输入管道性能指南中的其他建议也应该是有用的.

但是,还有另一种可能有用的"快速修复".在某些情况下,Dataset.map()转换中的工作量可能非常小,并且由为每个元素调用函数的成本占主导地位.在这些情况下,我们经常尝试对map函数进行矢量化,并在Dataset.batch()转换后移动它,以便更少次地调用函数(在这种情况下多次调用1/5),并执行更大的 - 并且可能更容易 - 每个批次的并行化操作.幸运的是,您的管道可以按如下方式进行矢量化:

def _batch_parser(record_batch):
  # NOTE: Use `tf.parse_example()` to operate on batches of records.
  parsed = tf.parse_example(record_batch, _keys_to_map)
  return parsed['d'], parsed['s']

def init_tfrecord_dataset():
  files_train = glob.glob(DIR_TFRECORDS + '*.tfrecord')
  random.shuffle(files_train)

  with tf.name_scope('tfr_iterator'):
    ds = tf.data.TFRecordDataset(files_train)      # define data from randomly ordered files
    ds = ds.shuffle(buffer_size=10000)             # select elements randomly from the buffer
    # NOTE: Change begins here.
    ds = ds.batch(BATCH_SIZE, drop_remainder=True) # group elements in batch (remove batch of less than BATCH_SIZE)
    ds = ds.map(_batch_parser)                     # map batches based on tfrecord format
    # NOTE: Change ends here.
    ds = ds.repeat()                               # iterate infinitely 

    return ds.make_initializable_iterator()        # initialize the iterator
Run Code Online (Sandbox Code Playgroud)

目前,矢量化是您必须手动进行的更改,但tf.data团队正在开发一个提供自动矢量化的优化过程.


小智 6

我有几点建议:

1)创建批次后,整个批次由该iterator_to_data()功能处理.这并不是真正在多线程上分配任务,至少不是在api级别.相反,你可以在init_tfrecord_dataset()函数中尝试这样的事情:

ds = tf.data.TFRecordDataset(files_train)      # define data from randomly ordered files
ds = ds.shuffle(buffer_size=10000)             # select elements randomly from the buffer
ds = ds.map(_parser)  
ds = ds.map(map_func=iterator_to_data, num_parallel_calls=FLAGS.num_preprocessing_threads)
ds = ds.batch(BATCH_SIZE, drop_remainder=True) # group elements in batch (remove batch of less than BATCH_SIZE)
ds = ds.repeat()
Run Code Online (Sandbox Code Playgroud)

您可能还想在iterator_to_data()函数中更改几行,因为输入参数不是具有上述更改的迭代器.

2)您可能还想使用类似的东西获取分析信息tf.train.ProfilerHook.这可以告诉您瓶颈是否与cpu或gpu有关.例如,如果瓶颈在于CPU,您可以看到GPU操作等待memcpyHtoD操作完成.