创建keras回调以在培训期间保存每个批次的模型预测和目标

Lem*_*mon 17 callback keras tensorflow

我正在Keras(tensorflow后端)构建一个简单的Sequential模型.在培训期间,我想检查各个培训批次和模型预测.因此,我正在尝试创建一个自定义Callback,以保存每个培训批次的模型预测和目标.但是,该模型不使用当前批次进行预测,而是使用整个训练数据.

我怎样才能将当前的培训批次交给Callback

我如何访问Callbackself.predhis和self.targets中保存的批次和目标?

我当前的版本如下:

callback_list = [prediction_history((self.x_train, self.y_train))]

self.model.fit(self.x_train, self.y_train, batch_size=self.batch_size, epochs=self.n_epochs, validation_data=(self.x_val, self.y_val), callbacks=callback_list)

class prediction_history(keras.callbacks.Callback):
    def __init__(self, train_data):
        self.train_data = train_data
        self.predhis = []
        self.targets = []

    def on_batch_end(self, epoch, logs={}):
        x_train, y_train = self.train_data
        self.targets.append(y_train)
        prediction = self.model.predict(x_train)
        self.predhis.append(prediction)
        tf.logging.info("Prediction shape: {}".format(prediction.shape))
        tf.logging.info("Targets shape: {}".format(y_train.shape))
Run Code Online (Sandbox Code Playgroud)

Yu-*_*ang 19

注意:原始接受的答案是错误的,如评论中所指出的那样.由于它被接受并且无法删除,我已经重写它以提供有效的答案.


模型编译后,占位符张量为y_truein model.targetsy_predin model.outputs.

要在每个批次中保存这些占位符的值,您可以:

  1. 首先将这些张量的值复制到变量中.
  2. 评估这些变量on_batch_end,并存储生成的数组.

现在步骤1有点涉及,因为你必须tf.assign在训练函数中添加一个操作model.train_function.使用当前的Keras API,可以通过提供构造训练函数的fetches参数来完成K.function().

model._make_train_function(),有一条线:

self.train_function = K.function(inputs,
                                 [self.total_loss] + self.metrics_tensors,
                                 updates=updates,
                                 name='train_function',
                                 **self._function_kwargs)
Run Code Online (Sandbox Code Playgroud)

fetches包含tf.assignops 的参数可以通过model._function_kwargs(仅在Keras 2.1.0之后工作)提供.

举个例子:

from keras.callbacks import Callback
from keras import backend as K
import tensorflow as tf

class CollectOutputAndTarget(Callback):
    def __init__(self):
        super(CollectOutputAndTarget, self).__init__()
        self.targets = []  # collect y_true batches
        self.outputs = []  # collect y_pred batches

        # the shape of these 2 variables will change according to batch shape
        # to handle the "last batch", specify `validate_shape=False`
        self.var_y_true = tf.Variable(0., validate_shape=False)
        self.var_y_pred = tf.Variable(0., validate_shape=False)

    def on_batch_end(self, batch, logs=None):
        # evaluate the variables and save them into lists
        self.targets.append(K.eval(self.var_y_true))
        self.outputs.append(K.eval(self.var_y_pred))

# build a simple model
# have to compile first for model.targets and model.outputs to be prepared
model = Sequential([Dense(5, input_shape=(10,))])
model.compile(loss='mse', optimizer='adam')

# initialize the variables and the `tf.assign` ops
cbk = CollectOutputAndTarget()
fetches = [tf.assign(cbk.var_y_true, model.targets[0], validate_shape=False),
           tf.assign(cbk.var_y_pred, model.outputs[0], validate_shape=False)]
model._function_kwargs = {'fetches': fetches}  # use `model._function_kwargs` if using `Model` instead of `Sequential`

# fit the model and check results
X = np.random.rand(10, 10)
Y = np.random.rand(10, 5)
model.fit(X, Y, batch_size=8, callbacks=[cbk])
Run Code Online (Sandbox Code Playgroud)

除非样品数量可以除以批量大小,否则最终批次的大小将与其他批次不同.所以K.variable()K.update()不能在这种情况下使用.你必须使用tf.Variable(..., validate_shape=False)tf.assign(..., validate_shape=False)不是.


要验证已保存数组的正确性,可以添加一行training.py以打印出混洗索引数组:

if shuffle == 'batch':
    index_array = _batch_shuffle(index_array, batch_size)
elif shuffle:
    np.random.shuffle(index_array)

print('Index array:', repr(index_array))  # Add this line

batches = _make_batches(num_train_samples, batch_size)
Run Code Online (Sandbox Code Playgroud)

在拟合期间应打印出混洗索引数组:

Epoch 1/1
Index array: array([8, 9, 3, 5, 4, 7, 1, 0, 6, 2])
10/10 [==============================] - 0s 23ms/step - loss: 0.5670

你可以检查是否cbk.targets相同Y[index_array]:

index_array = np.array([8, 9, 3, 5, 4, 7, 1, 0, 6, 2])
print(Y[index_array])
[[ 0.75325592  0.64857277  0.1926653   0.7642865   0.38901153]
 [ 0.77567689  0.13573623  0.4902501   0.42897559  0.55825652]
 [ 0.33760938  0.68195038  0.12303088  0.83509441  0.20991668]
 [ 0.98367778  0.61325065  0.28973401  0.28734073  0.93399794]
 [ 0.26097574  0.88219054  0.87951941  0.64887846  0.41996446]
 [ 0.97794604  0.91307569  0.93816428  0.2125808   0.94381495]
 [ 0.74813435  0.08036688  0.38094272  0.83178364  0.16713736]
 [ 0.52609421  0.39218962  0.21022047  0.58569125  0.08012982]
 [ 0.61276627  0.20679494  0.24124858  0.01262245  0.0994412 ]
 [ 0.6026137   0.25620512  0.7398164   0.52558182  0.09955769]]

print(cbk.targets)
[array([[ 0.7532559 ,  0.64857274,  0.19266529,  0.76428652,  0.38901153],
        [ 0.77567691,  0.13573623,  0.49025011,  0.42897558,  0.55825651],
        [ 0.33760938,  0.68195039,  0.12303089,  0.83509439,  0.20991668],
        [ 0.9836778 ,  0.61325067,  0.28973401,  0.28734073,  0.93399793],
        [ 0.26097575,  0.88219053,  0.8795194 ,  0.64887846,  0.41996446],
        [ 0.97794604,  0.91307569,  0.93816429,  0.2125808 ,  0.94381493],
        [ 0.74813437,  0.08036689,  0.38094273,  0.83178365,  0.16713737],
        [ 0.5260942 ,  0.39218962,  0.21022047,  0.58569127,  0.08012982]], dtype=float32),
 array([[ 0.61276627,  0.20679495,  0.24124858,  0.01262245,  0.0994412 ],
        [ 0.60261369,  0.25620511,  0.73981643,  0.52558184,  0.09955769]], dtype=float32)]
Run Code Online (Sandbox Code Playgroud)

如您所见,有两个批次cbk.targets(一个"完整批次",大小为8,最后一批为大小为2),行顺序与Y[index_array].

  • 对于那些遇到与我上面的评论相同的问题的用户,您想要在加载模型后,设置了model._function_kwargs = {'fetches':fetches}`之后但在model之前设置`model.train_function = None`。 .fit()`,因为`model._function_kwargs`值未保存在检查点中。如果`model.train_function = None`,`model.fit()`“重新加载”。有关更多详细信息,请查看“ training.py”中的“ _make_train_function”函数。 (2认同)

ber*_*ers 9

编辑(几乎是一个新答案),原因如下:

  • Yu-Yang 2017年的答案依赖于私有_make_train_function_function_kwargsAPI,它们仅在TF1中工作(并且可能在TF1兼容性中,即所谓的非渴望模式)。
  • 同样,Binyan Hu的 2020 年答案默认依赖于_make_test_functionTF2,并且在 TF2 中不起作用(也需要非 Eager 模式)。
  • 我自己的 2020 年 1 月答案已经受到几个必需的配置设置的影响,似乎已经停止与 TF 2.5(或之前)一起使用,并且我无法再制作model.inputs或工作。model.outputs
  • 最后,该答案的早期版本需要进行可能昂贵的模型评估才能获得每批的预测。类似的获取激活直方图的解决方案甚至会导致重复训练不同模型时出现 OOM 问题。

因此,我开始寻找一种方法来批量获取所有可能的数量(输入、目标、预测、激活) ,而不使用任何私有 API。目的是能够调用.numpy()预期的数量,因此 Keras 回调可以运行普通的 Python 代码以简化调试(我想这就是这个问题的主要目的 - 为了获得最大性能,人们可能会尝试集成尽可能多的计算无论如何,都可以进入 TensorFlow 的图操作)。

这是所有解决方案的通用基础模型:

"""Demonstrate batch data access."""
import tensorflow as tf
from tensorflow import keras


class DataCallback(keras.callbacks.Callback):
    """This class is where all implementations differ."""


def tf_nan(dtype):
    """Create NaN variable of proper dtype and variable shape for assign()."""
    return tf.Variable(float("nan"), dtype=dtype, shape=tf.TensorShape(None))


def main():
    """Run main."""
    model = keras.Sequential([keras.layers.Dense(1, input_shape=(2,))])

    callback = DataCallback()

    model.compile(loss="mse", optimizer="adam")
    model.fit(
        x=tf.transpose(tf.range(7.0) + [[0.2], [0.4]]),
        y=tf.transpose(tf.range(7.0) + 10 + [[0.5]]),
        validation_data=(
            tf.transpose(tf.range(11.0) + 30 + [[0.6], [0.7]]),
            tf.transpose(tf.range(11.0) + 40 + [[0.9]]),
        ),
        shuffle=False,
        batch_size=3,
        epochs=2,
        verbose=0,
        callbacks=[callback],
    )
    model.save("tmp.tf")


if __name__ == "__main__":
    main()
Run Code Online (Sandbox Code Playgroud)

以下三个片段分别展示了一种可能的解决方案,每种方案都有自己的优点和缺点。核心技巧始终相同:分配 atf.Variable并使用tf.Variable.assign将预期数量从以图形模式运行的一些 Keras 代码导出到回调中。这些方法在回调初始化和(在一种情况下)模型编译方面略有不同,最重要的是它们可以访问的数量,这就是为什么我在每个片段上方总结它们的原因。


自定义指标

使用自定义(假)指标(类似于我 2020 年 1 月的答案),虽然我们似乎无法再访问model.inputsNor model.outputs(甚至model.(_)targets不再存在),但我们可以访问y_truey_pred,它们代表模型目标和输出:

[ ] Inputs/Samples (x)
[ ] Weights (w)
[+] Targets/Labels (y_true)
[+] Outputs/Predictions (y_pred)
[ ] All layers (or only final input/output layers)
Run Code Online (Sandbox Code Playgroud)
[ ] Inputs/Samples (x)
[ ] Weights (w)
[+] Targets/Labels (y_true)
[+] Outputs/Predictions (y_pred)
[ ] All layers (or only final input/output layers)
Run Code Online (Sandbox Code Playgroud)

自定义训练步骤

我在这个答案的早期版本中使用了自定义训练步骤。这个想法原则上仍然有效,但y_pred可能会很昂贵,如果需要的话,使用自定义指标(见上文)可能是有意义的。

[+] Inputs/Samples (x)
[+] Weights (w)
[+] Targets/Labels (y_true)
[~] Outputs/Predictions (y_pred) [expensive!]
[ ] All layers (or only final input/output layers)
Run Code Online (Sandbox Code Playgroud)
"""Demonstrate batch data access using a custom metric."""
import tensorflow as tf
from tensorflow import keras


class DataCallback(keras.callbacks.Callback):  # diff
    """Callback to operate on batch data from metric."""

    def __init__(self):
        """Offer a metric to access batch data."""
        super().__init__()

        self.y_true = None
        self.y_pred = None

    def set_model(self, model):
        """Initialize variables when model is set."""
        self.y_true = tf_nan(model.output.dtype)
        self.y_pred = tf_nan(model.output.dtype)

    def metric(self, y_true, y_pred):
        """Fake metric."""
        self.y_true.assign(y_true)
        self.y_pred.assign(y_pred)

        return 0

    def on_train_batch_end(self, _batch, _logs=None):
        """See keras.callbacks.Callback.on_train_batch_end."""
        print("y_true =", self.y_true.numpy())
        print("y_pred =", self.y_pred.numpy())

    def on_train_end(self, _logs=None):
        """Clean up."""
        del self.y_true, self.y_pred


def tf_nan(dtype):
    """Create NaN variable of proper dtype and variable shape for assign()."""
    return tf.Variable(float("nan"), dtype=dtype, shape=tf.TensorShape(None))


def main():
    """Run main."""
    model = keras.Sequential([keras.layers.Dense(1, input_shape=(2,))])

    callback = DataCallback()

    model.compile(loss="mse", optimizer="adam", metrics=[callback.metric])  # diff
    model.fit(
        x=tf.transpose(tf.range(7.0) + [[0.2], [0.4]]),
        y=tf.transpose(tf.range(7.0) + 10 + [[0.5]]),
        validation_data=(
            tf.transpose(tf.range(11.0) + 30 + [[0.6], [0.7]]),
            tf.transpose(tf.range(11.0) + 40 + [[0.9]]),
        ),
        shuffle=False,
        batch_size=3,
        epochs=2,
        verbose=0,
        callbacks=[callback],
    )
    model.save("tmp.tf")


if __name__ == "__main__":
    main()
Run Code Online (Sandbox Code Playgroud)

自定义层调用

自定义层调用是访问每个层的输入和输出的超级灵活的方式。回调处理call层列表的函数修补。虽然我们无法访问权重和目标(因为这些数量在各个层的级别上没有意义),但它允许我们访问各个层的激活,这对于解决诸如如何使用“ tf.keras 记录激活”之类的问题非常方便.callbacks.TensorBoard`?

[+] Inputs/Samples (x)
[ ] Weights (w)
[ ] Targets/Labels (y_true)
[+] Outputs/Predictions (y_pred)
[+] All layers (or only final input/output layers)
Run Code Online (Sandbox Code Playgroud)
[+] Inputs/Samples (x)
[+] Weights (w)
[+] Targets/Labels (y_true)
[~] Outputs/Predictions (y_pred) [expensive!]
[ ] All layers (or only final input/output layers)
Run Code Online (Sandbox Code Playgroud)

何时使用which并打开待办事项

我认为每个解决方案上面的片段很好地总结了每种方法的功能。一般来说,

  • 自定义训练步骤将是访问模型输入的理想选择,例如批量数据集生成器、洗牌效果等;
  • 自定义层调用是访问模型中间层的理想选择;和
  • 自定义指标是访问模型输出的理想选择。

我相当确定(但没有尝试过)可以结合所有方法来同时访问所有批次数量。除了训练模式之外,我没有测试过任何东西 - 每种方法在测试或预测模式中的有用性都有进一步的优点和缺点。最后,我假设(但也没有测试过)它们应该只是tf.keras和之间的细微差别keras。在 TF2.8.rc1 和 Keras 2.8.0 上测试了这段代码,并将代码移tf.keras回到keraspip 包中,并且没有使用任何私有 API,我相信这个假设是合理的。

如果这种方法可以扩展到访问model.inputsmodel.outputs再次访问,那就太好了。目前,我收到如下错误:

TypeError:您将 KerasTensor(...)(中间 Keras 符号输入/输出)传递给 TF API,该 API 不允许注册自定义调度程序,例如tf.condtf.function、梯度磁带或tf.map_fn. Keras 功能模型构建仅支持支持调度的 TF API 调用,例如tf.math.addtf.reshape。其他 API 无法直接在符号 Keras 输入/输出上调用。您可以通过将操作放入自定义 Keras 层call并在此符号输入/输出上调用该层来解决此限制。


之前的回答

从 TF 2.2 开始,您可以使用自定义训练步骤而不是回调来实现您想要的效果。这是一个与 一起使用的演示tensorflow==2.2.0rc1,使用继承来改进keras.Sequential模型。从性能角度来看,这并不理想,因为预测会进行两次,一次 inself(x, training=True)一次 in super().train_step(data)。但你明白了。

这在 eager 模式下工作并且不使用私有 API,因此它应该非常稳定。需要注意的是,您必须使用tf.keras(独立版keras不支持Model.train_step),但我觉得独立版keras无论如何都变得越来越不受欢迎。(事实上​​,tf.keras迁移到了kerasTF2.8。)

"""Demonstrate batch data access using a custom training step."""
import tensorflow as tf
from tensorflow import keras


class DataCallback(keras.callbacks.Callback):  # diff
    """Callback to operate on batch data from training step."""

    def __init__(self):
        """Initialize tf.Variables."""
        super().__init__()

        self.x = None
        self.w = None
        self.y_true = None
        self.y_pred = None

    def set_model(self, model):
        """Wrap the model.train_step function to access training batch data."""
        self.x = tf_nan(model.input.dtype)
        # pylint:disable=protected-access (replace by proper dtype if you know it)
        if model.compiled_loss._user_loss_weights is not None:
            self.w = tf_nan(model.compiled_loss._user_loss_weights.dtype)
        self.y_true = tf_nan(model.output.dtype)
        self.y_pred = tf_nan(model.output.dtype)

        model_train_step = model.train_step

        def outer_train_step(data):
            # https://github.com/keras-team/keras/blob/v2.7.0/keras/engine/training.py
            x, y_true, w = keras.utils.unpack_x_y_sample_weight(data)

            self.x.assign(x)
            if w is not None:
                self.w.assign(w)
            self.y_true.assign(y_true)

            result = model_train_step(data)

            y_pred = model(x)
            self.y_pred.assign(y_pred)

            return result

        model.train_step = outer_train_step

    def on_train_batch_end(self, _batch, _logs=None):
        """See keras.callbacks.Callback.on_train_batch_end."""
        print("x =", self.x.numpy())
        if self.w is not None:
            print("w =", self.w.numpy())
        print("y_true =", self.y_true.numpy())
        print("y_pred =", self.y_pred.numpy())

    def on_train_end(self, _logs=None):
        """Clean up."""
        del self.x, self.w, self.y_true, self.y_pred


def tf_nan(dtype):
    """Create NaN variable of proper dtype and variable shape for assign()."""
    return tf.Variable(float("nan"), dtype=dtype, shape=tf.TensorShape(None))


def main():
    """Run main."""
    model = keras.Sequential([keras.layers.Dense(1, input_shape=(2,))])

    callback = DataCallback()

    model.compile(loss="mse", optimizer="adam")
    model.fit(
        x=tf.transpose(tf.range(7.0) + [[0.2], [0.4]]),
        y=tf.transpose(tf.range(7.0) + 10 + [[0.5]]),
        validation_data=(
            tf.transpose(tf.range(11.0) + 30 + [[0.6], [0.7]]),
            tf.transpose(tf.range(11.0) + 40 + [[0.9]]),
        ),
        shuffle=False,
        batch_size=3,
        epochs=2,
        verbose=0,
        callbacks=[callback],
    )
    model.save("tmp.tf")


if __name__ == "__main__":
    main()
Run Code Online (Sandbox Code Playgroud)

最后,这是一个没有继承的更简单的例子:

[+] Inputs/Samples (x)
[ ] Weights (w)
[ ] Targets/Labels (y_true)
[+] Outputs/Predictions (y_pred)
[+] All layers (or only final input/output layers)
Run Code Online (Sandbox Code Playgroud)