Implementation of ACGAN, based on

Implementation of ACGAN, based on

from sklearn.preprocessing import LabelBinarizer

import torch, time, os, pickle
import numpy as np
import torch.nn as nn
import torch.optim as optim
from torch.autograd import Variable
from import DataLoader, TensorDataset
from torchvision import datasets, transforms

def initialize_weights(net):
    for m in net.modules():
        if isinstance(m, nn.Conv2d):
  , 0.02)
        elif isinstance(m, nn.ConvTranspose2d):
  , 0.02)
        elif isinstance(m, nn.Linear):
  , 0.02)

class generator(nn.Module):
    # Network Architecture is exactly same as in infoGAN (
    # Architecture : FC1024_BR-FC7x7x128_BR-(64)4dc2s_BR-(1)4dc2s_S
    def __init__(self, height, width, channels):
        super(generator, self).__init__()
        self.input_height = height
        self.input_width = width
        self.input_dim = 62 + 10
        self.output_dim = channels

        self.fc = nn.Sequential(
            nn.Linear(self.input_dim, 1024),
            nn.Linear(1024, 128 * (self.input_height // 4) * (self.input_width // 4)),
            nn.BatchNorm1d(128 * (self.input_height // 4) * (self.input_width // 4)),
        self.deconv = nn.Sequential(
            nn.ConvTranspose2d(128, 64, 4, 2, 1),
            nn.ConvTranspose2d(64, self.output_dim, 4, 2, 1),

    def forward(self, input, label):
        x =[input, label], 1)
        x = self.fc(x)
        x = x.view(-1, 128, (self.input_height // 4), (self.input_width // 4))
        x = self.deconv(x)

        return x

class discriminator(nn.Module):
    # Network Architecture is exactly same as in infoGAN (
    # Architecture : (64)4c2s-(128)4c2s_BL-FC1024_BL-FC1_S
    def __init__(self, height, width, channels):
        super(discriminator, self).__init__()
        self.input_height = height
        self.input_width = width
        self.input_dim = channels
        self.output_dim = 1
        self.class_num = 10

        self.conv = nn.Sequential(
            nn.Conv2d(self.input_dim, 64, 4, 2, 1),
            nn.Conv2d(64, 128, 4, 2, 1),
        self.fc1 = nn.Sequential(
            nn.Linear(128 * (self.input_height // 4) * (self.input_width // 4), 1024),
        self.dc = nn.Sequential(
            nn.Linear(1024, self.output_dim),
        ) = nn.Sequential(
            nn.Linear(1024, self.class_num),

    def forward(self, input):
        x = self.conv(input)
        x = x.view(-1, 128 * (self.input_height // 4) * (self.input_width // 4))
        x = self.fc1(x)
        d = self.dc(x)
        c =

        return d, c

class ACGAN(object):
    def __init__(self, height, width, channels,
                 use_gpu=True, epochs=32, batch_size=64,
                 lrG=0.0002, lrD=0.0002, beta1=0.5, beta2=0.999,
        # parameters
        self.height = height
        self.width= width
        self.channels = channels
        self.epoch = epochs
        self.sample_num = 100
        self.batch_size = batch_size
        self.gpu_mode = use_gpu
        self.verbosity = verbose
        self.lrG = lrG
        self.lrD = lrD
        self.beta1 = beta1
        self.beta2 = beta2
        self.verbose = beta2

    def notify(self, message, verbosity=1):
        if self.verbosity >= verbosity:

    def train(self, X, Y, callback=None):
        self.train_hist = {}
        self.train_hist['D_loss'] = []
        self.train_hist['G_loss'] = []
        self.train_hist['per_epoch_time'] = []
        self.train_hist['total_time'] = []

        # networks init
        self.G = generator(self.height, self.width, self.channels)
        self.D = discriminator(self.height, self.width, self.channels)
        G_optimizer = optim.Adam(self.G.parameters(), lr=self.lrG, betas=(self.beta1, self.beta2))
        D_optimizer = optim.Adam(self.D.parameters(), lr=self.lrD, betas=(self.beta1, self.beta2))

        if self.gpu_mode:
            BCE_loss = nn.BCELoss().cuda()
            CE_loss = nn.CrossEntropyLoss().cuda()
            BCE_loss = nn.BCELoss()
            CE_loss = nn.CrossEntropyLoss()

        # print('---------- Networks architecture -------------')
        # utils.print_network(self.G)
        # utils.print_network(self.D)
        # print('-----------------------------------------------')

        # load mnist
        self.z_dim = 62
        self.y_dim = 10

        # fixed noise & condition
        sample_z_ = torch.zeros((self.sample_num, self.z_dim))
        for i in range(10):
            sample_z_[i * self.y_dim] = torch.rand(1, self.z_dim)
            for j in range(1, self.y_dim):
                sample_z_[i * self.y_dim + j] = sample_z_[i * self.y_dim]

        temp = torch.zeros((10, 1))
        for i in range(self.y_dim):
            temp[i, 0] = i

        temp_y = torch.zeros((self.sample_num, 1))
        for i in range(10):
            temp_y[i * self.y_dim: (i + 1) * self.y_dim] = temp

        sample_y_ = torch.zeros((self.sample_num, self.y_dim))
        sample_y_.scatter_(1, temp_y.type(torch.LongTensor), 1)

        # setup dataset
        data_Y, data_X = torch.FloatTensor(X), torch.FloatTensor(Y)

        if self.gpu_mode:
            y_real_, y_fake_ = Variable(torch.ones(self.batch_size, 1).cuda()), Variable(torch.zeros(self.batch_size, 1).cuda())
            y_real_, y_fake_ = Variable(torch.ones(self.batch_size, 1)), Variable(torch.zeros(self.batch_size, 1))

        self.notify('training start!!')
        start_time = time.time()
        for epoch in range(self.epoch):
            epoch_start_time = time.time()
            for iter in range(len(data_X) // self.batch_size):
                x_ = data_X[iter*self.batch_size:(iter+1)*self.batch_size]
                z_ = torch.rand((self.batch_size, self.z_dim))
                y_vec_ = data_Y[iter*self.batch_size:(iter+1)*self.batch_size]

                if self.gpu_mode:
                    x_, z_, y_vec_ = Variable(x_.cuda()), Variable(z_.cuda()), Variable(y_vec_.cuda())
                    x_, z_, y_vec_ = Variable(x_), Variable(z_), Variable(y_vec_)

                # update D network

                D_real, C_real = self.D(x_)
                D_real_loss = BCE_loss(D_real, y_real_)
                mxv = torch.max(y_vec_, 1)[1]
                C_real_loss = CE_loss(C_real, mxv)

                G_ = self.G(z_, y_vec_)
                D_fake, C_fake = self.D(G_)
                D_fake_loss = BCE_loss(D_fake, y_fake_)
                mxv = torch.max(y_vec_, 1)[1]
                C_fake_loss = CE_loss(C_fake, mxv)

                D_loss = D_real_loss + C_real_loss + D_fake_loss + C_fake_loss


                # update G network

                G_ = self.G(z_, y_vec_)
                D_fake, C_fake = self.D(G_)

                G_loss = BCE_loss(D_fake, y_real_)
                C_fake_loss = CE_loss(C_fake, torch.max(y_vec_, 1)[1])

                G_loss += C_fake_loss


                if ((iter + 1) % 10) == 0:
                    self.notify("Epoch: [%2d] [%4d/%4d] D_loss: %.8f, G_loss: %.8f" %
                          ((epoch + 1), (iter + 1), len(data_X) // self.batch_size,[0],[0]))

            if callback is not None:

            self.train_hist['per_epoch_time'].append(time.time() - epoch_start_time)

        self.train_hist['total_time'].append(time.time() - start_time)
        self.notify("Avg one epoch time: %.2f, total %d epochs time: %.2f" % (np.mean(self.train_hist['per_epoch_time']),
                                                                        self.epoch, self.train_hist['total_time'][0]))

def gpu_setting(kwargs):
    if 'gpu' in kwargs:
        use_gpu = kwargs['gpu']
        use_gpu = True

    return use_gpu

def get_callback(kwargs):
    if 'callback' in kwargs:
        return kwargs['callback']
    return None

class ACGANCategoryToImageGenerator(GeneratorBase):
    def __init__(self, use_gpu=True, epochs=32, batch_size=64,
                 lrG=0.0002, lrD=0.0002, beta1=0.5, beta2=0.999,
        self.use_gpu = use_gpu
        self.epochs = epochs
        self.batch_size = batch_size
        self.lrG = lrG
        self.lrD = lrD
        self.beta1 = beta1
        self.beta2 = beta2
        self.verbose = verbose = None
        self.bin = None

    def fit(self, X, Y, **kwargs):
        # shuffle dimensions to fit to pytorch conventions
        Y = np.transpose(Y, (0, 3, 1, 2))

        height = Y.shape[2]
        width = Y.shape[3]
        channels = Y.shape[1]

        # binarize the labels
        self.bin = LabelBinarizer()
        X = self.bin.fit_transform(X) = ACGAN(
            height, width, channels,
            epochs = self.epochs,
            batch_size = self.batch_size,
            lrG = self.lrG,
            lrD = self.lrD,
            beta1 = self.beta1,
            beta2 = self.beta2,
            verbose = self.verbose,
        ), Y, callback=get_callback(kwargs))

    def predict_noise(self, X, Z, **kwargs):
        if is None:
            raise RuntimeError("Please run the fitting procedure first!")

        iX = torch.FloatTensor(self.bin.transform(X))

        if gpu_setting(kwargs):
            iZ, iX = Variable(Z.cuda(), volatile=True), Variable(iX.cuda(), volatile=True)
            iZ, iX = Variable(Z, volatile=True), Variable(iX, volatile=True)

        Y =, iX)
        Y = Y.cpu().data.numpy()
        Y = np.transpose(Y, (0, 2, 3, 1))
        return Y

    def predict(self, X, **kwargs):
        # generate noise
        Z = torch.rand((len(X),

        # make generation
        return self.predict_noise(X, Z, **kwargs)


All estimators should specify all the parameters that can be set at the class level in their __init__ as explicit keyword arguments (no *args or **kwargs).

class ACGANCategoryToImageGenerator(GeneratorBase):
    def __init__(self, use_gpu=True, epochs=32, batch_size=64,
                 lrG=0.0002, lrD=0.0002, beta1=0.5, beta2=0.999,
        self.use_gpu = use_gpu
        self.epochs = epochs
        self.batch_size = batch_size
        self.lrG = lrG
        self.lrD = lrD
        self.beta1 = beta1
        self.beta2 = beta2
        self.verbose = verbose = None
        self.bin = None

    def fit(self, X, Y, **kwargs):
        # shuffle dimensions to fit to pytorch conventions
        Y = np.transpose(Y, (0, 3, 1, 2))

        height = Y.shape[2]
        width = Y.shape[3]
        channels = Y.shape[1]

        # binarize the labels
        self.bin = LabelBinarizer()
        X = self.bin.fit_transform(X) = ACGAN(
            height, width, channels,
            epochs = self.epochs,
            batch_size = self.batch_size,
            lrG = self.lrG,
            lrD = self.lrD,
            beta1 = self.beta1,
            beta2 = self.beta2,
            verbose = self.verbose,
        ), Y, callback=get_callback(kwargs))

    def predict_noise(self, X, Z, **kwargs):
        if is None:
            raise RuntimeError("Please run the fitting procedure first!")

        iX = torch.FloatTensor(self.bin.transform(X))

        if gpu_setting(kwargs):
            iZ, iX = Variable(Z.cuda(), volatile=True), Variable(iX.cuda(), volatile=True)
            iZ, iX = Variable(Z, volatile=True), Variable(iX, volatile=True)

        Y =, iX)
        Y = Y.cpu().data.numpy()
        Y = np.transpose(Y, (0, 2, 3, 1))
        return Y

    def predict(self, X, **kwargs):
        # generate noise
        Z = torch.rand((len(X),

        # make generation
        return self.predict_noise(X, Z, **kwargs)

def __init__(

self, use_gpu=True, epochs=32, batch_size=64, lrG=0.0002, lrD=0.0002, beta1=0.5, beta2=0.999, verbose=0)

Initialize self. See help(type(self)) for accurate signature.

def __init__(self, use_gpu=True, epochs=32, batch_size=64,
             lrG=0.0002, lrD=0.0002, beta1=0.5, beta2=0.999,
    self.use_gpu = use_gpu
    self.epochs = epochs
    self.batch_size = batch_size
    self.lrG = lrG
    self.lrD = lrD
    self.beta1 = beta1
    self.beta2 = beta2
    self.verbose = verbose = None
    self.bin = None

def fit(

self, X, Y, **kwargs)

Fit generative model to the data.


Y : {array-like, sparse matrix}, shape [n_samples, ...] The data that should be generated by particular model.

X : {array-like, sparse matrix}, shape [n_samples, ...] The data used to condition the generative model's outputs.

def fit(self, X, Y, **kwargs):
    # shuffle dimensions to fit to pytorch conventions
    Y = np.transpose(Y, (0, 3, 1, 2))
    height = Y.shape[2]
    width = Y.shape[3]
    channels = Y.shape[1]
    # binarize the labels
    self.bin = LabelBinarizer()
    X = self.bin.fit_transform(X) = ACGAN(
        height, width, channels,
        epochs = self.epochs,
        batch_size = self.batch_size,
        lrG = self.lrG,
        lrD = self.lrD,
        beta1 = self.beta1,
        beta2 = self.beta2,
        verbose = self.verbose,
    ), Y, callback=get_callback(kwargs))

def get_params(

self, deep=True)

Get parameters for this estimator.


deep : boolean, optional If True, will return the parameters for this estimator and contained subobjects that are estimators.


params : mapping of string to any Parameter names mapped to their values.

def get_params(self, deep=True):
    """Get parameters for this estimator.
    deep : boolean, optional
        If True, will return the parameters for this estimator and
        contained subobjects that are estimators.
    params : mapping of string to any
        Parameter names mapped to their values.
    out = dict()
    for key in self._get_param_names():
        # We need deprecation warnings to always be on in order to
        # catch deprecated param values.
        # This is set in utils/ but it gets overwritten
        # when running under python3 somehow.
        warnings.simplefilter("always", DeprecationWarning)
            with warnings.catch_warnings(record=True) as w:
                value = getattr(self, key, None)
            if len(w) and w[0].category == DeprecationWarning:
                # if the parameter is deprecated, don't show it
        # XXX: should we rather test if instance of estimator?
        if deep and hasattr(value, 'get_params'):
            deep_items = value.get_params().items()
            out.update((key + '__' + k, val) for k, val in deep_items)
        out[key] = value
    return out

def predict(

self, X, **kwargs)

Make estimations with generative model.


X : {array-like, sparse matrix}, shape [n_samples, ...] The data used to condition the generative model's outputs.


Y : {array-like, sparse matrix}, shape [n_samples, ...] The data that is generated by a generative model.

def predict(self, X, **kwargs):
    # generate noise
    Z = torch.rand((len(X),
    # make generation
    return self.predict_noise(X, Z, **kwargs)

def predict_noise(

self, X, Z, **kwargs)

def predict_noise(self, X, Z, **kwargs):
    if is None:
        raise RuntimeError("Please run the fitting procedure first!")
    iX = torch.FloatTensor(self.bin.transform(X))
    if gpu_setting(kwargs):
        iZ, iX = Variable(Z.cuda(), volatile=True), Variable(iX.cuda(), volatile=True)
        iZ, iX = Variable(Z, volatile=True), Variable(iX, volatile=True)
    Y =, iX)
    Y = Y.cpu().data.numpy()
    Y = np.transpose(Y, (0, 2, 3, 1))
    return Y

def score(

self, X, Y, **kwargs)

Score the generative model on the real data.


Y : {array-like, sparse matrix}, shape [n_samples, ...] The data that should be generated by particular model.

X : {array-like, sparse matrix}, shape [n_samples, ...] The data used to condition the generative model's outputs.

def score(self, X, Y, **kwargs):
    """Score the generative model on the real data.
    Y : {array-like, sparse matrix}, shape [n_samples, ...]
        The data that should be generated by particular model.
    X : {array-like, sparse matrix}, shape [n_samples, ...]
        The data used to condition the generative model's outputs.
    Yp = self.predict(X, **kwargs)
    score = distribution_similarity(Y, Yp)
    return score

def set_params(

self, **params)

Set the parameters of this estimator.

The method works on simple estimators as well as on nested objects (such as pipelines). The latter have parameters of the form <component>__<parameter> so that it's possible to update each component of a nested object.



def set_params(self, **params):
    """Set the parameters of this estimator.
    The method works on simple estimators as well as on nested objects
    (such as pipelines). The latter have parameters of the form
    ``<component>__<parameter>`` so that it's possible to update each
    component of a nested object.
    if not params:
        # Simple optimization to gain speed (inspect is slow)
        return self
    valid_params = self.get_params(deep=True)
    nested_params = defaultdict(dict)  # grouped by prefix
    for key, value in params.items():
        key, delim, sub_key = key.partition('__')
        if key not in valid_params:
            raise ValueError('Invalid parameter %s for estimator %s. '
                             'Check the list of available parameters '
                             'with `estimator.get_params().keys()`.' %
                             (key, self))
        if delim:
            nested_params[key][sub_key] = value
            setattr(self, key, value)
    for key, sub_params in nested_params.items():
    return self

