Top

noxer.gm.vae module

This script demonstrates how to build a variational autoencoder with Keras.

Reference: "Auto-Encoding Variational Bayes" https://arxiv.org/abs/1312.6114

'''This script demonstrates how to build a variational autoencoder with Keras.

Reference: "Auto-Encoding Variational Bayes" https://arxiv.org/abs/1312.6114
'''
import numpy as np
import matplotlib.pyplot as plt
from scipy.stats import norm

from .base import GeneratorBase
from sklearn.base import BaseEstimator, TransformerMixin

from keras.layers import Input, Dense, Lambda, Layer
from keras.models import Model
from keras import backend as K
from keras import metrics
from keras.optimizers import Adam
from keras.layers.merge import Concatenate
from noxer.rnn import make_keras_picklable

make_keras_picklable()

class VaeGenerator(GeneratorBase):
    def __init__(self, latent_dim=2, intermediate_dim=256,
                 batch_size=100, epochs=50, D=1.0, epsilon_std=1.0,
                 lr=0.001, beta_1=0.9, beta_2=0.999):

        super(VaeGenerator, self).__init__()

        self.latent_dim = latent_dim
        self.batch_size = batch_size
        self.epochs = epochs
        self.intermediate_dim = intermediate_dim
        self.D = D
        self.epsilon_std = epsilon_std

        self.lr = lr
        self.beta_1 = beta_1
        self.beta_2 = beta_2

    def fit(self, X, Y, **kwargs):

        self.intermediate_dim = int(self.intermediate_dim)
        condition_dim = X.shape[-1]
        original_dim = Y.shape[-1]
        self.condition_dim = condition_dim

        Y_smpl = Input(shape=(original_dim,))
        X_cond = Input(shape=(condition_dim,))
        R_norm = Input(shape=(self.latent_dim,))

        YX_conc = Concatenate()([Y_smpl, X_cond])
        h = Dense(self.intermediate_dim, activation='relu')(YX_conc)
        z_mean = Dense(self.latent_dim)(h)
        z_log_var = Dense(self.latent_dim)(h)

        def sampling(args):
            z_mean, z_log_var, epsilon = args
            return z_mean + K.exp(z_log_var / 2) * epsilon

        # note that "output_shape" isn't necessary with the TensorFlow backend
        latent_g = Lambda(sampling, output_shape=(self.latent_dim,))([z_mean, z_log_var, R_norm])

        # we instantiate these layers separately so as to reuse them later
        decoder_h = Dense(self.intermediate_dim, activation='relu')
        decoder_mean = Dense(original_dim, activation='linear')

        zx = Concatenate()([latent_g, X_cond])

        h_decoded = decoder_h(zx)
        y_decoded_mean = decoder_mean(h_decoded)

        vae_determinizm = self.D

        # Custom loss layer
        class VariationalLossLayer(Layer):
            def __init__(self, **kwargs):
                self.is_placeholder = True
                super(VariationalLossLayer, self).__init__(**kwargs)

            def vae_loss(self, x, x_decoded_mean):
                xent_loss = metrics.mean_squared_error(x, x_decoded_mean)
                kl_loss = - 0.5 * K.mean(1 + z_log_var - K.square(z_mean) - K.exp(z_log_var), axis=-1)
                return K.mean(xent_loss * vae_determinizm + kl_loss)

            def call(self, inputs):
                x, x_decoded_mean, condition = inputs
                loss = self.vae_loss(x, x_decoded_mean)
                self.add_loss(loss, inputs=inputs)
                # We won't actually use the output.
                return x

        y = VariationalLossLayer()([Y_smpl, y_decoded_mean, X_cond])
        vae = Model([R_norm, X_cond, Y_smpl], y)
        vae.compile(
            optimizer=Adam(
                lr=self.lr, beta_1=self.beta_1, beta_2=self.beta_2
            ),
            loss=None
        )

        for i in range(self.epochs):
            R = self._generate_noise(len(X))
            vae.fit([R, X, Y],
                    shuffle=True,
                    epochs=1,
                    batch_size=self.batch_size,
                    verbose=0)

        # build a model to project inputs on the latent space
        self.encoder = Model([Y_smpl, X_cond], z_mean)

        # build a digit generator that can sample from the learned distribution
        decoder_latent = Input(shape=(self.latent_dim,))
        decoder_condit = Input(shape=(self.condition_dim,))

        zx = Concatenate()([decoder_condit, decoder_latent])

        _h_decoded = decoder_h(zx)
        _x_decoded_mean = decoder_mean(_h_decoded)

        self.generator = Model([decoder_condit, decoder_latent], _x_decoded_mean)

    def _generate_noise(self, N):
        return np.random.randn(N, self.latent_dim)

    def predict(self, X, *args, **kwargs):
        lat = self._generate_noise(len(X))
        Yp = self.generator.predict([X, lat], verbose=0)
        return Yp

Classes

class VaeGenerator

Base class for all estimators in scikit-learn

Notes

All estimators should specify all the parameters that can be set at the class level in their __init__ as explicit keyword arguments (no *args or **kwargs).

class VaeGenerator(GeneratorBase):
    def __init__(self, latent_dim=2, intermediate_dim=256,
                 batch_size=100, epochs=50, D=1.0, epsilon_std=1.0,
                 lr=0.001, beta_1=0.9, beta_2=0.999):

        super(VaeGenerator, self).__init__()

        self.latent_dim = latent_dim
        self.batch_size = batch_size
        self.epochs = epochs
        self.intermediate_dim = intermediate_dim
        self.D = D
        self.epsilon_std = epsilon_std

        self.lr = lr
        self.beta_1 = beta_1
        self.beta_2 = beta_2

    def fit(self, X, Y, **kwargs):

        self.intermediate_dim = int(self.intermediate_dim)
        condition_dim = X.shape[-1]
        original_dim = Y.shape[-1]
        self.condition_dim = condition_dim

        Y_smpl = Input(shape=(original_dim,))
        X_cond = Input(shape=(condition_dim,))
        R_norm = Input(shape=(self.latent_dim,))

        YX_conc = Concatenate()([Y_smpl, X_cond])
        h = Dense(self.intermediate_dim, activation='relu')(YX_conc)
        z_mean = Dense(self.latent_dim)(h)
        z_log_var = Dense(self.latent_dim)(h)

        def sampling(args):
            z_mean, z_log_var, epsilon = args
            return z_mean + K.exp(z_log_var / 2) * epsilon

        # note that "output_shape" isn't necessary with the TensorFlow backend
        latent_g = Lambda(sampling, output_shape=(self.latent_dim,))([z_mean, z_log_var, R_norm])

        # we instantiate these layers separately so as to reuse them later
        decoder_h = Dense(self.intermediate_dim, activation='relu')
        decoder_mean = Dense(original_dim, activation='linear')

        zx = Concatenate()([latent_g, X_cond])

        h_decoded = decoder_h(zx)
        y_decoded_mean = decoder_mean(h_decoded)

        vae_determinizm = self.D

        # Custom loss layer
        class VariationalLossLayer(Layer):
            def __init__(self, **kwargs):
                self.is_placeholder = True
                super(VariationalLossLayer, self).__init__(**kwargs)

            def vae_loss(self, x, x_decoded_mean):
                xent_loss = metrics.mean_squared_error(x, x_decoded_mean)
                kl_loss = - 0.5 * K.mean(1 + z_log_var - K.square(z_mean) - K.exp(z_log_var), axis=-1)
                return K.mean(xent_loss * vae_determinizm + kl_loss)

            def call(self, inputs):
                x, x_decoded_mean, condition = inputs
                loss = self.vae_loss(x, x_decoded_mean)
                self.add_loss(loss, inputs=inputs)
                # We won't actually use the output.
                return x

        y = VariationalLossLayer()([Y_smpl, y_decoded_mean, X_cond])
        vae = Model([R_norm, X_cond, Y_smpl], y)
        vae.compile(
            optimizer=Adam(
                lr=self.lr, beta_1=self.beta_1, beta_2=self.beta_2
            ),
            loss=None
        )

        for i in range(self.epochs):
            R = self._generate_noise(len(X))
            vae.fit([R, X, Y],
                    shuffle=True,
                    epochs=1,
                    batch_size=self.batch_size,
                    verbose=0)

        # build a model to project inputs on the latent space
        self.encoder = Model([Y_smpl, X_cond], z_mean)

        # build a digit generator that can sample from the learned distribution
        decoder_latent = Input(shape=(self.latent_dim,))
        decoder_condit = Input(shape=(self.condition_dim,))

        zx = Concatenate()([decoder_condit, decoder_latent])

        _h_decoded = decoder_h(zx)
        _x_decoded_mean = decoder_mean(_h_decoded)

        self.generator = Model([decoder_condit, decoder_latent], _x_decoded_mean)

    def _generate_noise(self, N):
        return np.random.randn(N, self.latent_dim)

    def predict(self, X, *args, **kwargs):
        lat = self._generate_noise(len(X))
        Yp = self.generator.predict([X, lat], verbose=0)
        return Yp

Ancestors (in MRO)

  • VaeGenerator
  • noxer.gm.base.GeneratorBase
  • sklearn.base.BaseEstimator
  • builtins.object

Static methods

def __init__(

self, latent_dim=2, intermediate_dim=256, batch_size=100, epochs=50, D=1.0, epsilon_std=1.0, lr=0.001, beta_1=0.9, beta_2=0.999)

Initialize self. See help(type(self)) for accurate signature.

def __init__(self, latent_dim=2, intermediate_dim=256,
             batch_size=100, epochs=50, D=1.0, epsilon_std=1.0,
             lr=0.001, beta_1=0.9, beta_2=0.999):
    super(VaeGenerator, self).__init__()
    self.latent_dim = latent_dim
    self.batch_size = batch_size
    self.epochs = epochs
    self.intermediate_dim = intermediate_dim
    self.D = D
    self.epsilon_std = epsilon_std
    self.lr = lr
    self.beta_1 = beta_1
    self.beta_2 = beta_2

def fit(

self, X, Y, **kwargs)

Fit generative model to the data.

Parameters

Y : {array-like, sparse matrix}, shape [n_samples, ...] The data that should be generated by particular model.

X : {array-like, sparse matrix}, shape [n_samples, ...] The data used to condition the generative model's outputs.

def fit(self, X, Y, **kwargs):
    self.intermediate_dim = int(self.intermediate_dim)
    condition_dim = X.shape[-1]
    original_dim = Y.shape[-1]
    self.condition_dim = condition_dim
    Y_smpl = Input(shape=(original_dim,))
    X_cond = Input(shape=(condition_dim,))
    R_norm = Input(shape=(self.latent_dim,))
    YX_conc = Concatenate()([Y_smpl, X_cond])
    h = Dense(self.intermediate_dim, activation='relu')(YX_conc)
    z_mean = Dense(self.latent_dim)(h)
    z_log_var = Dense(self.latent_dim)(h)
    def sampling(args):
        z_mean, z_log_var, epsilon = args
        return z_mean + K.exp(z_log_var / 2) * epsilon
    # note that "output_shape" isn't necessary with the TensorFlow backend
    latent_g = Lambda(sampling, output_shape=(self.latent_dim,))([z_mean, z_log_var, R_norm])
    # we instantiate these layers separately so as to reuse them later
    decoder_h = Dense(self.intermediate_dim, activation='relu')
    decoder_mean = Dense(original_dim, activation='linear')
    zx = Concatenate()([latent_g, X_cond])
    h_decoded = decoder_h(zx)
    y_decoded_mean = decoder_mean(h_decoded)
    vae_determinizm = self.D
    # Custom loss layer
    class VariationalLossLayer(Layer):
        def __init__(self, **kwargs):
            self.is_placeholder = True
            super(VariationalLossLayer, self).__init__(**kwargs)
        def vae_loss(self, x, x_decoded_mean):
            xent_loss = metrics.mean_squared_error(x, x_decoded_mean)
            kl_loss = - 0.5 * K.mean(1 + z_log_var - K.square(z_mean) - K.exp(z_log_var), axis=-1)
            return K.mean(xent_loss * vae_determinizm + kl_loss)
        def call(self, inputs):
            x, x_decoded_mean, condition = inputs
            loss = self.vae_loss(x, x_decoded_mean)
            self.add_loss(loss, inputs=inputs)
            # We won't actually use the output.
            return x
    y = VariationalLossLayer()([Y_smpl, y_decoded_mean, X_cond])
    vae = Model([R_norm, X_cond, Y_smpl], y)
    vae.compile(
        optimizer=Adam(
            lr=self.lr, beta_1=self.beta_1, beta_2=self.beta_2
        ),
        loss=None
    )
    for i in range(self.epochs):
        R = self._generate_noise(len(X))
        vae.fit([R, X, Y],
                shuffle=True,
                epochs=1,
                batch_size=self.batch_size,
                verbose=0)
    # build a model to project inputs on the latent space
    self.encoder = Model([Y_smpl, X_cond], z_mean)
    # build a digit generator that can sample from the learned distribution
    decoder_latent = Input(shape=(self.latent_dim,))
    decoder_condit = Input(shape=(self.condition_dim,))
    zx = Concatenate()([decoder_condit, decoder_latent])
    _h_decoded = decoder_h(zx)
    _x_decoded_mean = decoder_mean(_h_decoded)
    self.generator = Model([decoder_condit, decoder_latent], _x_decoded_mean)

def get_params(

self, deep=True)

Get parameters for this estimator.

Parameters

deep : boolean, optional If True, will return the parameters for this estimator and contained subobjects that are estimators.

Returns

params : mapping of string to any Parameter names mapped to their values.

def get_params(self, deep=True):
    """Get parameters for this estimator.
    Parameters
    ----------
    deep : boolean, optional
        If True, will return the parameters for this estimator and
        contained subobjects that are estimators.
    Returns
    -------
    params : mapping of string to any
        Parameter names mapped to their values.
    """
    out = dict()
    for key in self._get_param_names():
        # We need deprecation warnings to always be on in order to
        # catch deprecated param values.
        # This is set in utils/__init__.py but it gets overwritten
        # when running under python3 somehow.
        warnings.simplefilter("always", DeprecationWarning)
        try:
            with warnings.catch_warnings(record=True) as w:
                value = getattr(self, key, None)
            if len(w) and w[0].category == DeprecationWarning:
                # if the parameter is deprecated, don't show it
                continue
        finally:
            warnings.filters.pop(0)
        # XXX: should we rather test if instance of estimator?
        if deep and hasattr(value, 'get_params'):
            deep_items = value.get_params().items()
            out.update((key + '__' + k, val) for k, val in deep_items)
        out[key] = value
    return out

def predict(

self, X, *args, **kwargs)

Make estimations with generative model.

Parameters

X : {array-like, sparse matrix}, shape [n_samples, ...] The data used to condition the generative model's outputs.

Returns

Y : {array-like, sparse matrix}, shape [n_samples, ...] The data that is generated by a generative model.

def predict(self, X, *args, **kwargs):
    lat = self._generate_noise(len(X))
    Yp = self.generator.predict([X, lat], verbose=0)
    return Yp

def score(

self, X, Y, **kwargs)

Score the generative model on the real data.

Parameters

Y : {array-like, sparse matrix}, shape [n_samples, ...] The data that should be generated by particular model.

X : {array-like, sparse matrix}, shape [n_samples, ...] The data used to condition the generative model's outputs.

def score(self, X, Y, **kwargs):
    """Score the generative model on the real data.
    Parameters
    ----------
    Y : {array-like, sparse matrix}, shape [n_samples, ...]
        The data that should be generated by particular model.
    X : {array-like, sparse matrix}, shape [n_samples, ...]
        The data used to condition the generative model's outputs.
    """
    Yp = self.predict(X, **kwargs)
    score = distribution_similarity(Y, Yp)
    return score

def set_params(

self, **params)

Set the parameters of this estimator.

The method works on simple estimators as well as on nested objects (such as pipelines). The latter have parameters of the form <component>__<parameter> so that it's possible to update each component of a nested object.

Returns

self

def set_params(self, **params):
    """Set the parameters of this estimator.
    The method works on simple estimators as well as on nested objects
    (such as pipelines). The latter have parameters of the form
    ``<component>__<parameter>`` so that it's possible to update each
    component of a nested object.
    Returns
    -------
    self
    """
    if not params:
        # Simple optimization to gain speed (inspect is slow)
        return self
    valid_params = self.get_params(deep=True)
    nested_params = defaultdict(dict)  # grouped by prefix
    for key, value in params.items():
        key, delim, sub_key = key.partition('__')
        if key not in valid_params:
            raise ValueError('Invalid parameter %s for estimator %s. '
                             'Check the list of available parameters '
                             'with `estimator.get_params().keys()`.' %
                             (key, self))
        if delim:
            nested_params[key][sub_key] = value
        else:
            setattr(self, key, value)
    for key, sub_params in nested_params.items():
        valid_params[key].set_params(**sub_params)
    return self

Instance variables

var D

var batch_size

var beta_1

var beta_2

var epochs

var epsilon_std

var intermediate_dim

var latent_dim

var lr