mat*_*ick 1 normalization keras tensorflow2.0
是否有更好的方法来构建 RealNVP 层以用作 tensorflow 2.0 中的标准可训练层?我最终将它包装在一个模型中。对于图层,变量没有出现在 trainable_variables 中。
像这样运行,但我怀疑有更好的方法:
from pylab import *
import numpy as np
import pandas as pd
import tensorflow as tf
import tensorflow_probability as tfp
tfb = tfp.bijectors
tfd = tfp.distributions
# class NVPLayer(tf.keras.layers.Layer):
class NVPLayer(tf.keras.models.Model):
def __init__(self, *, output_dim, num_masked, **kwargs):
super().__init__(**kwargs)
self.output_dim = output_dim
self.num_masked = num_masked
self.shift_and_log_scale_fn = tfb.real_nvp_default_template(
hidden_layers=[2],
activation=None, # linear
)
self.loss = None
def call(self, *inputs):
nvp = tfd.TransformedDistribution(
distribution=tfd.MultivariateNormalDiag(loc=[0., 0., 0.]),
bijector=tfb.RealNVP(
num_masked=self.num_masked,
shift_and_log_scale_fn=self.shift_and_log_scale_fn)
)
self.loss = tf.reduce_mean(nvp.log_prob(*inputs)) # how else to do this?
return nvp.bijector.forward(*inputs)
layer = NVPLayer(output_dim=3, num_masked=1)
x = (np.random.randn(100, 3) * np.array([1, 3, 5]) + np.array([-3, -10, 4])).astype(np.float32)
z0 = layer(x).numpy()
optimizer = tf.keras.optimizers.Adam(learning_rate=0.01)
for i in range(1000):
with tf.GradientTape() as tape:
y = layer(x)
loss = - layer.loss
print(loss)
g = tape.gradient(loss, layer.trainable_variables)
l = optimizer.apply_gradients(zip(g, layer.trainable_variables))
z1 = layer(x).numpy()
print(pd.DataFrame(z0).describe())
print(pd.DataFrame(z1).describe())
Run Code Online (Sandbox Code Playgroud)
小智 5
感谢您提供代码,我用它作为起点(tf 2.0rc0 和 tfp 0.8rc0)。我玩了一会儿,发现你可以让训练更快,把它包装成@tf.function。
但是,当您这样做时,您必须将双射器和 TransformedDistribution 内容拉入 init 部分。否则,您以后无法访问它们。我使用了与您不同的双射器,但原理保持不变。
但我不知道这是否是最好的方法。
from pylab import *
import numpy as np
import pandas as pd
import tensorflow as tf
import tensorflow_probability as tfp
tfb = tfp.bijectors
tfd = tfp.distributions
class MAF(tf.keras.models.Model):
def __init__(self, *, output_dim, num_masked, **kwargs): #** additional arguments for the super class
super().__init__(**kwargs)
self.output_dim = output_dim
self.num_masked = num_masked
self.shift_and_log_scale_fn = tfb.masked_autoregressive_default_template(hidden_layers=[128, 128])
# Defining the bijector
num_bijectors = 5
bijectors=[]
for i in range(num_bijectors):
bijectors.append(tfb.MaskedAutoregressiveFlow(shift_and_log_scale_fn=self.shift_and_log_scale_fn))
bijectors.append(tfb.Permute(permutation=[1, 0]))
# Discard the last Permute layer.
bijector = tfb.Chain(list(reversed(bijectors[:-1])))
# Defining the flow
self.flow = tfd.TransformedDistribution(
distribution=tfd.MultivariateNormalDiag(loc=[0., 0.]),
bijector=bijector)
def call(self, *inputs):
return self.flow.bijector.forward(*inputs)
def getFlow(self, num):
return self.flow.sample(num)
print(X.shape)
model = MAF(output_dim=2, num_masked=1)
# model.summary() #Yields an error. The model needs called before it is build.
_ = model(X)
model.summary()
optimizer = tf.keras.optimizers.Adam(learning_rate=0.001)
@tf.function #Adding the tf.function makes it about 10 times faster!!!
def train_step(X):
with tf.GradientTape() as tape:
predictions = model(X)
loss = -tf.reduce_mean(model.flow.log_prob(X))
gradients = tape.gradient(loss, model.trainable_variables)
optimizer.apply_gradients(zip(gradients, model.trainable_variables))
return loss
# Training
from time import time
start = time()
for i in range(1001):
loss = train_step(X)
if (i % 50 == 0):
print(i, " ",loss.numpy(), (time()-start))
start = time()
# Sampling from the trained model
XF = model.flow.sample(10000)
plt.scatter(XF[:, 0], XF[:, 1], s=5, color='blue')
Run Code Online (Sandbox Code Playgroud)
该笔记本可以在这里找到https://github.com/tensorchiefs/dl_book_playground/blob/master/flow/Flow_101_learning_parameters.ipynb或使用colab在https://colab.research.google.com/github/tensorchiefs/dl_book_playground/blob /master/flow/Flow_101_learning_parameters.ipynb
| 归档时间: |
|
| 查看次数: |
1034 次 |
| 最近记录: |