TensorFlow:如何在培训期间多次评估验证数据队列?

kmh*_*ann 18 python machine-learning tensorflow

TL;博士

如何在每次K次训练迭代后评估验证集,使用单独的队列进行训练和验证数据,而不需要tf.Sessions在多个进程中分开?鉴于我的特殊问题,似乎没有一种干净的方法来实现这一点,而我目前的解决方法(我认为可行)会给我一些未定义的行为.救命!

整个故事

我想在每次K训练迭代中评估验证集,我无法弄清楚如何在TensorFlow中正确实现这一点.这应该是最常见的操作之一,但感觉TensorFlow的API /架构在这里对我有用,或者至少让事情变得不必要.

我的假设是:

  • [A1]此处描述的培训/验证多进程模型https://www.tensorflow.org/how_tos/reading_data/#multiple_input_pipelines不适用于我的问题,因为我必须假设没有足够的GPU内存可用加载变量两次.
  • [A2]我想在每次K训练迭代中评估验证集.
  • [A3]训练和验证数据都不能简单地从磁盘读取,而是在运行中生成.这使得不可能预先可靠地预先计算验证集的大小.
  • [A4]验证集太大,无法预先计算并存储到磁盘上.
  • [A5]有效验证集大小不一定是批量大小的倍数.

训练输入管道设置如下:

  • A tf.train.slice_input_producer()生成(混洗)文件名列表,每个文件名都引用原始输入数据.
  • 自定义数据生成功能从每个原始输入数据块生成可变数量的训练样本/标签.
  • 生成的培训样本/标签tf.train.shuffle_batch()在被送入网络之前排队等候.

由于[A3],[A4],[A5],验证输入管道以几乎相同的方式设置,除了最终输入队列是通过生成tf.train.batch(),因为不希望进行混洗.由于上述假设,基于feed_dict的方法也是不可行的,并且看起来与使用更高级别的功能(例如,使用更高级别的功能)不相容tf.train.batch.

但是,使用两组不同的队列进行培训和验证的直接实现不起作用.据我了解,我有两个选择:

  • [B1]将num_epochs验证的参数设置tf.train.slice_input_producerNone.

    在这种情况下,验证集会无休止地循环,但我需要提前知道验证集的大小,以明确限制每次运行验证集时要评估的批次数.此外,如果验证集大小不能被批量大小整除,我将在最后一批中总是拉一点.由于这会每次都改变验证数据的评估顺序,因此这是不可接受的.

  • [B2]将num_epochs验证的参数设置tf.train.slice_input_producer1,并另外将函数的allow_smaller_final_batch参数设置tf.train.batchTrue.

    在这种情况下,验证集只循环一次,之后相应的队列永远关闭.默认情况下,这将使评估验证集不可能两次或多次.由于我不知道在TensorFlow中重新打开队列的好方法,我需要解决这个限制.

由于选项[B1]的更大限制,我选择解决选项[B2]的问题.概述我当前方法的(伪)代码如下:

训练循环应该是相当规范的.每次K次迭代,都会调用一个评估验证集的函数.请注意,我只启动名称以"train_"开头的队列; 这些是为收集生成的训练数据而设置的队列.为了做到这一点,我创建了两个辅助函数,get_queues_by_namestart_queue_runners.

def train_loop(train_ops, vali_ops, ...):
    with tf.Session() as sess:
        coord = tf.train.Coordinator()
        sess.run([tf.initialize_all_variables(), tf.initialize_local_variables()])
        load_latest_snapshot(sess, loader, snapshot_file)

        # Launch the queue runners
        queues = get_queues_by_name("train")
        threads = start_queue_runners(sess, coord, queues)

        try:
            for step in range(start_iteration, num_train_iterations):
                # Runs the session on validation set
                if step % K == 0:
                    validation_results = run_validation(vali_ops, snapshot_file)

                # TRAINING:
                # ...

        except Exception as e:
            coord.request_stop(e)
        finally:
            coord.request_stop()
            coord.join(threads)
Run Code Online (Sandbox Code Playgroud)

辅助函数如下所示:

def get_queues_by_name(name):
    """Retrieves all queues that contain the string given by 'name'"""
    all_queues = tf.get_collection(tf.GraphKeys.QUEUE_RUNNERS)
    return [q for q in all_queues if name in q.name]


def start_queue_runners(session, coordinator, queues):
    """Similar to tf.train.start_queue_runners but now accepts a list of queues instead of a graph collection"""
    with session.graph.as_default():
        threads = []
        for queue in queues:
            log("Queue", "Starting queue '%s'" % queue.name, level=2)
            threads.extend(queue.create_threads(session, coordinator, daemon=True, start=True))
    return threads
Run Code Online (Sandbox Code Playgroud)

在该run_validation函数中,我选择的针对已关闭队列问题的解决方法是创建一个新的tf.Session.我也只启动与队列收集验证集数据相关联的线程.

def run_validation(ops, snapshot_file):  # Called inside train_loop()
    results = None
    loader = tf.train.Saver()

    with tf.Session() as sess:
        coord = tf.train.Coordinator()
        sess.run([tf.initialize_local_variables()])
        load_latest_snapshot(sess, loader, snapshot_file)

        # Launch the queue runners
        queues = get_queues_by_name("eval")
        threads = start_queue_runners(sess, coord, queues)

        # Performs the inference in batches
        try:
            # Evaluate validation set:
            results = eval_in_batches(ops, sess)
        except Exception as e:
            coord.request_stop(e)
        finally:
            coord.request_stop()
            coord.join(threads)

    return results
Run Code Online (Sandbox Code Playgroud)

我不知道tf.Session在这里创建一个新的是一个好主意,但它似乎是完成重新启动验证队列的唯一方法.理想情况下,我也不想重新加载模型快照,因为这似乎在概念上是不必要的.

此代码的问题在于我在运行期间看到不稳定/未定义的行为,例如在验证集评估期间NaN或Inf出现在网络内部.这似乎主要发生在训练集队列仍然被填充的同时填充验证集队列时(因为在验证集评估期间训练队列是打开的).例如,如果我在迭代0评估验证集(当两个队列仍然需要填充时),这种情况经常发生.尽管它们在不同的会话中运行,但似乎训练/验证队列共享一些全局状态.

有人可以解释为什么会发生这种情况,以及如何在考虑上述假设[A1] - [A5]的同时更清楚地解决这个问题?

Alb*_*ert 2

我目前面临类似的问题。到目前为止,我完全避免了任何队列,只是通过输入数据,feed_dict但由于不使用队列和并行性,我显然失去了一些性能(尽管我仍然对当前的速度感到满意,因为我之前在 Theano 中做了同样的事情) 。现在我想重新设计它并使用队列并偶然发现了这个问题。有这个这个这个相关的问题。

我目前正在考虑这样做:

  • 在训练中,我想使用 aRandomShuffleQueue这使得它变得更加复杂。我想我会忽略这个问题,一旦将张量排入队列的读取器线程完成,我会让训练停止,所以我会丢失这个纪元的剩余最新项目,capacity并将其用于下一个纪元。也许为了使其具有确定性,我检查了仍然从队列中读取的火车线程,直到只剩下min_after_dequeue项目。

  • 在评估中,我想使用相同的图表和相同的会话。我可以使用tf.cond从另一个单独的队列而不是RandomShuffleQueue. feed_dict或者我可以在评估中使用。如果我要使用单独的队列,我会使用FIFOQueue并仔细跟踪我执行了正确数量的步骤。我还可以引入另一个虚拟张量,我将其排入队列,该队列给我一个end_of_epoch标志左右,这样我就知道在评估线程中何时停止。


在 TensorFlow 1.2 中,将会有tf.contrib.data接口(问题评论文档概述API 文档),它提供的tf.contrib.data.DatasetAPI 也支持类似的 shuffletf.RandomShuffleQueue以及批处理和多个 epoch 的循环。此外,您还可以通过在数据上创建迭代器来访问数据,并且可以重置迭代器。一些相关的 StackOverflow 问题位于此处此处