Estimating MI by the proposed adaptive label smoothing classifier-based model (AdapLS)¶

Setup¶

The following cell imports the necessary packages:

import torch
import os
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import numpy as np
import matplotlib.pyplot as plt
import argparse
from torch.autograd import Variable
import itertools
from tqdm import tqdm
from data.mix_gaussian import MixedGaussian
from data.gaussian import Gaussian
from model.utils import *
from torch.utils.tensorboard import SummaryWriter 

Use GPU if GPU is available.

cuda = True if torch.cuda.is_available() else False
FloatTensor = torch.cuda.FloatTensor if cuda else torch.FloatTensor
LongTensor = torch.cuda.LongTensor if cuda else torch.LongTensor
torch.set_default_tensor_type(FloatTensor)

name = './results/AdapLS' # filename
chkpt_name = name+'.pt'                         # checkpoint
writer = SummaryWriter('./results/log/AdapLS')

Set random seed for reproducibility.

SEED = 0
torch.manual_seed(SEED)
torch.cuda.manual_seed(SEED)
np.random.seed(SEED)

Specify the hyper-parameters:

parser = argparse.ArgumentParser()
parser.add_argument("--rho", type=float, default=0.9, help="coefficient of Gaussian")
parser.add_argument("--d", type=int, default=6, help="dimension of X & Y")
parser.add_argument("--sample_size", type=int, default=400, help="sample size")
parser.add_argument("--gamma", type=float, default=0, help="clipping parameter")
parser.add_argument("--b1", type=float, default=0.5, help="adam: decay of first order momentum of gradient")
parser.add_argument("--b2", type=float, default=0.999, help="adam: decay of first order momentum of gradient")
parser.add_argument("--n_iters", type=int, default=60000, help="number of epochs of training")
parser.add_argument("--batch_size", type=int, default=100, help="size of the batches")
parser.add_argument("--lr", type=float, default=1e-4, help="adam: learning rate")
parser.add_argument("--hidden_dim", type=int, default=100, help="Hidden dimension")
parser.add_argument("--c_0_1_ratio", type=float, default=1, help="Ratio of samples with label 0 and samples with label 1 ")
parser.add_argument("--ma_rate", type=float, default=0.1, help="move average rate")
parser.add_argument("--ma_ef", type=float, default=1, help="move average ef")
parser.add_argument("--alpha", type=float, default=1e-4, help="smooth parameter")

opt, unknown = parser.parse_known_args()
# adaptive LS parameters
a, b, c = 0.01, 1e-8, 1-1e-8  # a is \alpha_0, b is \tau and c is 1-\tau in the paper

Specify the distribution to generate the data samples:

# Two choices here: 'Gaussian' and 'MixedGaussian
density = 'Gaussian'

Model¶

Define the function generate_data for generating Gaussian or Mixed Gaussian distributions.

  • When generating Gaussian distrition, \({X}\) and \({Y}\) are distributed as \begin{align} p_{XY} = \prod_{i=1}^d \mathcal{N} \left( \mathbf{0}, \begin{bmatrix}1 & \rho \ \rho & 1\ \end{bmatrix} \right) \end{align}

  • When generating Mixed Gaussian distributions, \({X}\) and \({Y}\) are distributed as \begin{align} p_{XY} = \frac{1}{2} \prod_{i=1}^d \mathcal{N} \left( \mathbf{0}, \begin{bmatrix}1 & \rho \ \rho & 1\ \end{bmatrix} \right) + \frac{1}{2} \prod_{i=1}^d \mathcal{N} \left( \mathbf{0}, \begin{bmatrix}1 & {-} \rho \ {-} \rho & 1\ \end{bmatrix} \right) \end{align}

def generate_data(distribution='Gaussian', rho=0.9):
    # rho is the covariance for generating distributions
    # mu1 and mu2 are means for generating Mixed Gaussian distribution 
    mu1 = 0
    mu2 = 0

    # mg is an object of Class Gaussian or MixedGaussian
    if distribution=='Gaussian':
        mg = Gaussian(sample_size=opt.sample_size,rho=rho)
    else:
        mg = MixedGaussian(sample_size=opt.sample_size, mean1=mu1, mean2=mu2,rho1=rho, rho2=-rho)

    # Calculate the ground truth MI between X and Y for (X, Y) from mg
    mi = mg.ground_truth * opt.d

    # Create X, Y for storing generated samples
    X = np.zeros((opt.sample_size,opt.d))
    Y = np.zeros((opt.sample_size,opt.d))

    # Generate samples of random variable X,Y and XY
    for j in range(opt.d):
        # In each iteration, mg.data will generate samples of two dimensions where one dimension for X and another for Y respectively 
        data = mg.data
        X[:,j] = data[:,0]
        Y[:,j] = data[:,1]        
    X = torch.Tensor(X)
    Y = torch.Tensor(Y)
    XY = torch.cat((X, Y), dim=1)

    return XY, X, Y, mi

The function acti_func below is for calculating the adaptive smoothing weight. (Clearer motivation and definition of function acti_func will be given in the following parts.)

def acti_func(x, a, b, c):
    # a is \alpha_0, b is \tau and c is 1-\tau in the paper
    alpha = torch.zeros_like(x)
    x_cpu = x.cpu()
    alpha[np.where(x_cpu.cpu()<=b)] = - a*x[np.where(x_cpu<=b)]/b + a
    alpha[np.where((x_cpu>b) & (x_cpu<c))] = 0
    alpha[np.where(x_cpu>=c)] = a*x[np.where(x_cpu>=c)]/(1-c) + a*c/(c-1)
    return alpha

Define the neural network.

class Net(nn.Module):
    # Inner class that defines the neural network architecture
    def __init__(self, input_size=2, hidden_size=100, sigma=0.02):
        super().__init__()
        self.fc1 = nn.Linear(input_size, hidden_size)
        self.fc2 = nn.Linear(hidden_size, hidden_size)
        self.fc3 = nn.Linear(hidden_size, 1)
        nn.init.normal_(self.fc1.weight, std=sigma)
        nn.init.constant_(self.fc1.bias, 0)
        nn.init.normal_(self.fc2.weight, std=sigma)
        nn.init.constant_(self.fc2.bias, 0)
        nn.init.normal_(self.fc3.weight, std=sigma)
        nn.init.constant_(self.fc3.bias, 0)

    def forward(self, input):
        output = F.elu(self.fc1(input))
        output = F.elu(self.fc2(output))
        output = F.sigmoid(self.fc3(output))
        
        alpha = acti_func(output, a, b, c)

        return output, alpha

The function _resample below is for resampling the given data samples for training the neural network.

def _resample(data, batch_size, replace=False):
    # Resample the given data sample.
    index = np.random.choice(
        range(data.shape[0]), size=batch_size, replace=replace)
    batch = data[index]
    return batch

The function mi_estimate below returns the empirical estimate of MI given by Probabilistic Classifier Method (PCM): \begin{align} I(X, Y) = E \left[ \ln r(X, Y) \right], \end{align} where \begin{align} r(X, Y) := \frac{p_{C}(0)}{p_{C}(1)} \cdot \frac{p_{{C} | {\tilde{X}}{\tilde{Y}}} (1|x,y)}{p_{{C} | {\tilde{X}}{\tilde{Y}}} (0|x,y)}, \end{align} and \(p_{{C} | {\tilde{X}}{\tilde{Y}}}(1|x,y)\) is implemented by a neural network and may be clipped to a certain range before used for calculating MI estimation.

def mi_estimate(model, XY, gamma, c_0_1_ratio):
    # c_0_1_ratio = p_{C}(0) / p_{C}(1)
    pre, _ = model(XY)
    pre = pre.clamp(min=gamma, max=1-gamma) # clip the output pre controled by gamma; gamma = 0 means no clipping
    MI_est = torch.log(c_0_1_ratio*pre/((1-pre).clamp(min=gamma, max=1-gamma))).mean()

    return MI_est

The function smooth_ce_loss returns the binary cross-entropy loss of the predicted value and the smoothed target.

def smooth_ce_loss(pre_label, true_label, smoothing, num_classes):
    new_labels = (1.0 - smoothing) * true_label + smoothing / num_classes
    return torch.nn.BCELoss()(pre_label, new_labels)

The neural network discriminator below implements and outputs \(p_{{C} | {\tilde{X}}{\tilde{Y}}}(1|x,y)\) before clipping:

discriminator = Net(input_size=opt.d*2, hidden_size=100)
# move NN model GPU if GPU is available
if cuda:
    discriminator.cuda()

# Adam opetimizer
optimizer_D = torch.optim.Adam(discriminator.parameters(), lr=opt.lr, betas=(opt.b1, opt.b2))

Load previous results.

load_available = True # set to False to prevent loading previous results
if load_available and os.path.exists(chkpt_name):
    checkpoint = torch.load(
        chkpt_name, map_location='cuda' if torch.cuda.is_available() else 'cpu')
    mi_list = checkpoint['mi_list']
    model_state = checkpoint['model_state']
    discriminator.load_state_dict(model_state)
    print('Previous results loaded.')
else:
    mi_list = [] # storing the mi estimation of each iteration
Previous results loaded.

Training¶

We want to obtain \(p_{{C} | {\tilde{X}}{\tilde{Y}}}(1|x,y)\) by a neural network \(\hat{p}_{\theta}\).

To train the neural network \(\hat{p}_{\theta}\), instead of minimizing the cross-entropy of \(\hat{p}_{\theta}\) relative to \(p_{{C} | {\tilde{X}}{\tilde{Y}}}\), we propose minimizing the cross-entropy of \(\hat{p}_{\theta}\) relative to the distribution \begin{align} p^{‘}{{C} | {\tilde{X}}{\tilde{Y}}} := (1-\alpha) \cdot p{{C} | {\tilde{X}}{\tilde{Y}}} + \alpha \cdot p_{U}, \end{align} where \(p_{U} := \frac{1}{2}\) is the uniform over the labels \(\{0, 1\}\) (in fact \(p_{U}\) can be any distribution over labels), and \(\alpha\) is the smoothing weight.

Here, we use adaptive smoothing weight \(\alpha\) which will be given by function acti_func called in the forward pass of the neural network \(\hat{p}_{\theta}\). The adaptive smoothing weight \(\alpha\) depends on the input sample and the neural network, and is defined as follows: \begin{equation*} \alpha = \left{ \begin{array}{ll} -\frac{\alpha_{0}}{\tau}\hat{p}{\theta}(1|\tilde{X},\tilde{Y})+\alpha{0}, & \hat{p}{\theta}(1|\tilde{X},\tilde{Y})\in[0,\tau], \ 0, & \hat{p}{\theta}(1|\tilde{X},\tilde{Y})\in(\tau,1-\tau), \ \frac{\alpha_{0}}{\tau}\hat{p}{\theta}(1|\tilde{X},\tilde{Y})+\frac{\tau-1}{\tau}\alpha{0}, & \hat{p}_{\theta}(1|\tilde{X},\tilde{Y})\in[1-\tau,1], \end{array} \right. \end{equation*} where \(\alpha_{0} \in [0, 1]\) is the maximum regularization weight and \(\tau \in (0, \frac{1}{2})\) is the threshold to judge whether the classifier is starting to get overconfident.

We train the neural network with adative smoothing weight and \(p^{'}_{{C} | {\tilde{X}}{\tilde{Y}}}\), while when we evaluate the MI estimate by the trained neural network, we use the function mi_estimate which is based on MI estimation by Probabilistic Classifier Method (PCM) method and uses \(p_{{C} | {\tilde{X}}{\tilde{Y}}}\) instead of \(p^{'}_{{C} | {\tilde{X}}{\tilde{Y}}}\).

The following cell train the neural network using the data samples.

XY, X, Y, Ground_truth = generate_data(distribution=density, rho=opt.rho)
continue_train = False  # set to True to continue to train
if continue_train:
    for i in range(opt.n_iters):
        data_joint = _resample(XY, batch_size=opt.batch_size)
        X_ref = resample(X, batch_size=opt.batch_size)
        Y_ref = resample(Y, batch_size=opt.batch_size)
        data_margin = torch.cat((X_ref, Y_ref), dim=1)
        valid = Variable(torch.Tensor(opt.batch_size, 1).fill_(1.0), requires_grad=False)
        fake = Variable(torch.Tensor(opt.batch_size, 1).fill_(0.0), requires_grad=False)
        train_data = torch.cat((data_joint, data_margin), dim=0)
        labels = torch.cat((valid, fake), dim=0)
        pred_label, alpha = discriminator(train_data) # alpha is the adaptive label smoothing weight
        # print(alpha)
        c_0_1_ratio = data_margin.shape[0]/data_joint.shape[0] # c_0_1_ratio = p_{C}(0) / p_{C}(1) 
        
        optimizer_D.zero_grad()
        loss = smooth_ce_loss(pred_label, labels, alpha.detach(), 2) # cross-entropy of $\hat{p}_{\theta}$ relative to the distribution p^{'}_{C}
        loss.backward()
        optimizer_D.step()
        
        with torch.no_grad():
            mi_est = mi_estimate(discriminator, XY, opt.gamma, c_0_1_ratio)
            mi_list.append(mi_est.item())
        writer.add_scalar('mi_list', mi_est.item(), i)
        writer.add_scalar('loss', loss, i)
        if i%500==0:
            print("Iternation: %d, loss: %f, mi_est: %f"%(i, loss.item(), mi_est))
writer.add_graph(discriminator, (XY,))

MI estimates¶

Use moving average to smooth the estimated MI in each training iteration.

ma_rate = 0.01
mi_copy = mi_list.copy()
for k in range(1,len(mi_list)):
    mi_copy[k] = (1-ma_rate) * mi_copy[k-1] + ma_rate * mi_copy[k]

Plot the MI estimation curve against the training iteration, together the ground truth.

plt.plot(mi_copy, label='MI estimate')
plt.axhline(Ground_truth,label='ground truth',linestyle='--',color='red')
for t in range(len(mi_copy)):
    if (mi_copy[t]>.8*Ground_truth):
        plt.axvline(t,label='80% reached',linestyle=':',color='green')
        break
plt.xlabel('number of iterations')
plt.ylabel('MI estimation')
plt.legend()
<matplotlib.legend.Legend at 0x7f0e1c197e90>
_images/AdapLS_39_1.svg

Save the model.

overwrite = True  # set to True to overwrite previously stored results
if overwrite or not os.path.exists(chkpt_name):
    model_state = discriminator.state_dict()
    torch.save({
        'mi_list': mi_list,
        'mi_copy': mi_copy,
        'model_state': model_state
    }, chkpt_name)
    writer.close()
    print('Current results saved.')
Current results saved.