Keras,级联多个 RNN 模型用于 N 维输出

Omn*_*ity 6 python keras tensorflow recurrent-neural-network tf.keras

我在以一种不寻常的方式将两个模型链接在一起时遇到了一些困难。

我正在尝试复制以下流程图:

级联 RNN 2-D

为清楚起见,在Model[0]我尝试从IR[i](中间表示)生成整个时间序列作为使用Model[1]. 该方案的目的是允许从一维输入生成参差不齐的二维时间序列(同时允许在不需要该时间步长的输出时省略第二个模型,并且不需要Model[0]不断地“在接受输入和生成输出之间切换模式)。

我假设需要一个自定义训练循环,并且我已经有一个自定义训练循环来处理第一个模型中的状态(以前的版本在每个时间步只有一个输出)。如图所示,第二个模型应该有相当短的输出(能够限制在少于 10 个时间步长)。

但归根结底,虽然我可以考虑我想做的事情,但我对 Keras 和/或 Tensorflow 还不够熟练,无法实际实现它。(事实上​​,这是我与图书馆的第一个非玩具项目。)

我在文献中搜索了类似的鹦鹉方案或示例代码,但没有成功。我什至不知道这个想法在 TF/Keras 中是否可行。

我已经让这两个模型独立工作。(正如我已经计算出维度,并使用虚拟数据进行了一些训练以获得第二个模型的垃圾输出,第一个模型基于此问题的先前迭代并且已经过全面训练。)如果我有Model[0]Model[1]作为python变量(让我们称它们为model_amodel_b),那么我将如何将它们链接在一起来做到这一点?

编辑添加:

如果这一切都不清楚,也许拥有每个输入和输出的维度会有所帮助:

每个输入和输出的维度是:

输入:(batch_size, model_a_timesteps, input_size)
红外:(batch_size, model_a_timesteps, ir_size)

IR[i] (复制后): (batch_size, model_b_timesteps, ir_size)
Out[i]: (batch_size, model_b_timesteps, output_size)
Out:(batch_size, model_a_timesteps, model_b_timesteps, output_size)

Ove*_*gon 5

由于这个问题有多个主要部分,因此我专门针对核心挑战进行了问答:有状态反向传播。这个答案的重点是实现可变输出步长。


描述

  • 正如案例 5 所验证的,我们可以采用自下而上优先的方法。首先,我们将完整的输入提供给model_a(A) - 然后,将其输出作为输入提供给model_b(B),但这次一次一步
  • 请注意,我们必须将 B 的输出步骤链接A 的输入步骤,而不是在A 的输入步骤之间链接;Out[0][1]即,在您的图中,梯度是在和之间流动Out[0][0],而不是在Out[2][0]和之间流动Out[0][1]
  • 对于计算损失,我们使用不规则张量还是填充张量并不重要;然而,我们必须使用填充张量来写入 TensorArray。
  • 下面代码中的循环逻辑是通用的;然而,为了简单起见,特定的属性处理和隐藏状态传递是硬编码的,但为了通用性可以重写。

代码:在底部。


例子

  • 在这里,我们预定义了 A 的每个输入的 B 迭代次数,但我们可以实现任意停止逻辑。例如,我们可以将DenseB 层的输出作为隐藏状态,并检查其 L2 范数是否超过阈值。
  • 根据上面的内容,如果longest_step我们不知道,我们可以简单地设置它,这对于 NLP 和其他带有 STOP 标记的任务来说很常见。
    • TensorArrays或者,我们可以用 ; 在每个 A 的输入处编写分隔符dynamic_size=True。请参阅下面的“不确定点”。
  • 一个合理的担忧是,我们如何知道梯度正确流动?请注意,我们已经在链接的问答中验证了它们的垂直和水平,但它没有涵盖每个输入步骤的多个输出步骤,对于多个输入步骤。见下文。

不确定点:我不完全确定egOut[0][1]和之间的梯度是否相互作用Out[2][0]。然而,我确实验证了如果我们将B 的输出与 A 的输入分开(情况 2),梯度不会水平流动;TensorArray重新实现案例 4 和 5,两种模型的梯度都会有所不同,包括具有完整单水平通道的较低模型。

因此我们必须写一个统一的TensorArray. 为此,由于没有从 egIR[1]到 的操作Out[0][1],我看不出 TF 如何追踪它 - 所以看来我们是安全的。但请注意,在下面的示例中,使用steps_at_t=[1]*6 将使梯度在两个模型中水平流动,因为我们正在写入单个TensorArray并传递隐藏状态。

然而,所检查的情况很混乱,B 在所有步骤都是有状态的;解除此要求后,我们可能不需要TensorArray为所有Out[0]、等编写一个统一的文件Out[1],但我们仍然必须对我们知道有效的东西进行测试,这不再那么简单。


示例[代码]

import numpy as np
import tensorflow as tf

#%%# Make data & models, then fit ###########################################
x0 = y0 = tf.constant(np.random.randn(2, 3, 4))
msn = MultiStatefulNetwork(batch_shape=(2, 3, 4), steps_at_t=[3, 4, 2])

#%%#############################################
with tf.GradientTape(persistent=True) as tape:
    outputs = msn(x0)
    # shape: (3, 4, 2, 4), 0-padded
    # We can pad labels accordingly.
    # Note the (2, 4) model_b's output shape, which is a timestep slice;
    # model_b is a *slice model*. Careful in implementing various logics
    # which are and aren't intended to be stateful.
Run Code Online (Sandbox Code Playgroud)

方法

不是最干净、也不是最优化的代码,但它可以工作;改进的余地。

更重要的是:我在 Eager 中实现了这一点,并且不知道它在 Graph 中如何工作,并且使其同时适用于两者可能非常棘手。如果需要,只需在图表中运行并比较“案例”中所做的所有值。

# ideally we won't `import tensorflow` at all; kept for code simplicity
import tensorflow as tf
from tensorflow.python.util import nest
from tensorflow.python.ops import array_ops, tensor_array_ops
from tensorflow.python.framework import ops

from tensorflow.keras.layers import Input, SimpleRNN, SimpleRNNCell
from tensorflow.keras.models import Model

#######################################################################
class MultiStatefulNetwork():
    def __init__(self, batch_shape=(2, 6, 4), steps_at_t=[]):
        self.batch_shape=batch_shape
        self.steps_at_t=steps_at_t

        self.batch_size = batch_shape[0]
        self.units = batch_shape[-1]
        self._build_models()

    def __call__(self, inputs):
        outputs = self._forward_pass_a(inputs)
        outputs = self._forward_pass_b(outputs)
        return outputs

    def _forward_pass_a(self, inputs):
        return self.model_a(inputs, training=True)

    def _forward_pass_b(self, inputs):
        return model_rnn_outer(self.model_b, inputs, self.steps_at_t)

    def _build_models(self):
        ipt = Input(batch_shape=self.batch_shape)
        out = SimpleRNN(self.units, return_sequences=True)(ipt)
        self.model_a = Model(ipt, out)

        ipt  = Input(batch_shape=(self.batch_size, self.units))
        sipt = Input(batch_shape=(self.batch_size, self.units))
        out, state = SimpleRNNCell(4)(ipt, sipt)
        self.model_b = Model([ipt, sipt], [out, state])

        self.model_a.compile('sgd', 'mse')
        self.model_b.compile('sgd', 'mse')


def inner_pass(model, inputs, states):
    return model_rnn(model, inputs, states)


def model_rnn_outer(model, inputs, steps_at_t=[2, 2, 4, 3]):
    def outer_step_function(inputs, states):
        x, steps = inputs
        x = array_ops.expand_dims(x, 0)
        x = array_ops.tile(x, [steps, *[1] * (x.ndim - 1)])  # repeat steps times
        output, new_states = inner_pass(model, x, states)
        return output, new_states

    (outer_steps, steps_at_t, longest_step, outer_t, initial_states,
     output_ta, input_ta) = _process_args_outer(model, inputs, steps_at_t)

    def _outer_step(outer_t, output_ta_t, *states):
        current_input = [input_ta.read(outer_t), steps_at_t.read(outer_t)]
        output, new_states = outer_step_function(current_input, tuple(states))

        # pad if shorter than longest_step.
        # model_b may output twice, but longest in `steps_at_t` is 4; then we need
        # output.shape == (2, *model_b.output_shape) -> (4, *...)
        # checking directly on `output` is more reliable than from `steps_at_t`
        output = tf.cond(
            tf.math.less(output.shape[0], longest_step),
            lambda: tf.pad(output, [[0, longest_step - output.shape[0]],
                                    *[[0, 0]] * (output.ndim - 1)]),
            lambda: output)

        output_ta_t = output_ta_t.write(outer_t, output)
        return (outer_t + 1, output_ta_t) + tuple(new_states)

    final_outputs = tf.while_loop(
        body=_outer_step,
        loop_vars=(outer_t, output_ta) + initial_states,
        cond=lambda outer_t, *_: tf.math.less(outer_t, outer_steps))

    output_ta = final_outputs[1]
    outputs = output_ta.stack()
    return outputs


def _process_args_outer(model, inputs, steps_at_t):
    def swap_batch_timestep(input_t):
        # Swap the batch and timestep dim for the incoming tensor.
        # (samples, timesteps, channels) -> (timesteps, samples, channels)
        # iterating dim0 to feed (samples, channels) slices expected by RNN
        axes = list(range(len(input_t.shape)))
        axes[0], axes[1] = 1, 0
        return array_ops.transpose(input_t, axes)

    inputs = nest.map_structure(swap_batch_timestep, inputs)

    assert inputs.shape[0] == len(steps_at_t)
    outer_steps = array_ops.shape(inputs)[0]  # model_a_steps
    longest_step = max(steps_at_t)
    steps_at_t = tensor_array_ops.TensorArray(
        dtype=tf.int32, size=len(steps_at_t)).unstack(steps_at_t)

    # assume single-input network, excluding states which are handled separately
    input_ta = tensor_array_ops.TensorArray(
        dtype=inputs.dtype,
        size=outer_steps,
        element_shape=tf.TensorShape(model.input_shape[0]),
        tensor_array_name='outer_input_ta_0').unstack(inputs)

    # TensorArray is used to write outputs at every timestep, but does not
    # support RaggedTensor; thus we must make TensorArray such that column length
    # is that of the longest outer step, # and pad model_b's outputs accordingly
    element_shape = tf.TensorShape((longest_step, *model.output_shape[0]))

    # overall shape: (outer_steps, longest_step, *model_b.output_shape)
    # for every input / at each step we write in dim0 (outer_steps)
    output_ta = tensor_array_ops.TensorArray(
        dtype=model.output[0].dtype,
        size=outer_steps,
        element_shape=element_shape,
        tensor_array_name='outer_output_ta_0')

    outer_t = tf.constant(0, dtype='int32')
    initial_states = (tf.zeros(model.input_shape[0], dtype='float32'),)

    return (outer_steps, steps_at_t, longest_step, outer_t, initial_states,
            output_ta, input_ta)


def model_rnn(model, inputs, states):
    def step_function(inputs, states):
        output, new_states = model([inputs, *states], training=True)
        return output, new_states

    initial_states = states
    input_ta, output_ta, time, time_steps_t = _process_args(model, inputs)

    def _step(time, output_ta_t, *states):
        current_input = input_ta.read(time)
        output, new_states = step_function(current_input, tuple(states))

        flat_state = nest.flatten(states)
        flat_new_state = nest.flatten(new_states)
        for state, new_state in zip(flat_state, flat_new_state):
            if isinstance(new_state, ops.Tensor):
                new_state.set_shape(state.shape)

        output_ta_t = output_ta_t.write(time, output)
        new_states = nest.pack_sequence_as(initial_states, flat_new_state)
        return (time + 1, output_ta_t) + tuple(new_states)

    final_outputs = tf.while_loop(
        body=_step,
        loop_vars=(time, output_ta) + tuple(initial_states),
        cond=lambda time, *_: tf.math.less(time, time_steps_t))

    new_states = final_outputs[2:]
    output_ta = final_outputs[1]
    outputs = output_ta.stack()
    return outputs, new_states


def _process_args(model, inputs):
    time_steps_t = tf.constant(inputs.shape[0], dtype='int32')

    # assume single-input network (excluding states)
    input_ta = tensor_array_ops.TensorArray(
        dtype=inputs.dtype,
        size=time_steps_t,
        tensor_array_name='input_ta_0').unstack(inputs)

    # assume single-output network (excluding states)
    output_ta = tensor_array_ops.TensorArray(
        dtype=model.output[0].dtype,
        size=time_steps_t,
        element_shape=tf.TensorShape(model.output_shape[0]),
        tensor_array_name='output_ta_0')

    time = tf.constant(0, dtype='int32', name='time')
    return input_ta, output_ta, time, time_steps_t
Run Code Online (Sandbox Code Playgroud)