Estimating MI by Probabilistic Classifier Method (PCM)¶
Setup¶
The following cell imports the necessary packages:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import os
import numpy as np
import matplotlib.pyplot as plt
import argparse
from torch.autograd import Variable
import itertools
from tqdm import tqdm
from data.mix_gaussian import MixedGaussian
from data.gaussian import Gaussian
from model.utils import *
from torch.utils.tensorboard import SummaryWriter
cuda = True if torch.cuda.is_available() else False
FloatTensor = torch.cuda.FloatTensor if cuda else torch.FloatTensor
LongTensor = torch.cuda.LongTensor if cuda else torch.LongTensor
Use GPU if GPU is available.
cuda = True if torch.cuda.is_available() else False
FloatTensor = torch.cuda.FloatTensor if cuda else torch.FloatTensor
LongTensor = torch.cuda.LongTensor if cuda else torch.LongTensor
torch.set_default_tensor_type(FloatTensor)
name = './results/PCM' # filename
chkpt_name = name+'.pt' # checkpoint
writer = SummaryWriter('./results/log/PCM')
Set random seed for reproducibility.
SEED = 0
torch.manual_seed(SEED)
torch.cuda.manual_seed(SEED)
np.random.seed(SEED)
Specify the hyper-parameters:
parser = argparse.ArgumentParser()
parser.add_argument("--rho", type=float, default=0.9, help="coefficient of Gaussian")
parser.add_argument("--d", type=int, default=6, help="dimension of X & Y")
parser.add_argument("--sample_size", type=int, default=400, help="sample size")
parser.add_argument("--gamma", type=float, default=1e-8, help="clipping parameter")
parser.add_argument("--b1", type=float, default=0.5, help="adam: decay of first order momentum of gradient")
parser.add_argument("--b2", type=float, default=0.999, help="adam: decay of first order momentum of gradient")
parser.add_argument("--n_iters", type=int, default=60000, help="number of epochs of training")
parser.add_argument("--batch_size", type=int, default=100, help="size of the batches")
parser.add_argument("--lr", type=float, default=1e-4, help="adam: learning rate")
parser.add_argument("--hidden_dim", type=int, default=100, help="Hidden dimension")
parser.add_argument("--c_0_1_ratio", type=float, default=1, help="Ratio of samples with label 0 and samples with label 1 ")
parser.add_argument("--ma_rate", type=float, default=0.1, help="move average rate")
parser.add_argument("--ma_ef", type=float, default=1, help="move average ef")
parser.add_argument("--alpha", type=float, default=1e-4, help="smooth parameter")
opt, unknown = parser.parse_known_args()
Specify the distribution to generate the data samples:
# Two choices here: 'Gaussian' and 'MixedGaussian
density = 'Gaussian'
Model¶
Define the function generate_data
for generating Gaussian or Mixed Gaussian distributions.
When generating Gaussian distrition, \({X}\) and \({Y}\) are distributed as \begin{align} p_{XY} = \prod_{i=1}^d \mathcal{N} \left( \mathbf{0}, \begin{bmatrix}1 & \rho \ \rho & 1\ \end{bmatrix} \right) \end{align}
When generating Mixed Gaussian distributions, \({X}\) and \({Y}\) are distributed as \begin{align} p_{XY} = \frac{1}{2} \prod_{i=1}^d \mathcal{N} \left( \mathbf{0}, \begin{bmatrix}1 & \rho \ \rho & 1\ \end{bmatrix} \right) + \frac{1}{2} \prod_{i=1}^d \mathcal{N} \left( \mathbf{0}, \begin{bmatrix}1 & {-} \rho \ {-} \rho & 1\ \end{bmatrix} \right) \end{align}
def generate_data(distribution='Gaussian', rho=0.9):
# rho is the covariance for generating distributions
# mu1 and mu2 are means for generating Mixed Gaussian distribution
mu1 = 0
mu2 = 0
# mg is an object of Class Gaussian or MixedGaussian
if distribution=='Gaussian':
mg = Gaussian(sample_size=opt.sample_size,rho=rho)
else:
mg = MixedGaussian(sample_size=opt.sample_size, mean1=mu1, mean2=mu2,rho1=rho, rho2=-rho)
# Calculate the ground truth MI between X and Y for (X, Y) from mg
mi = mg.ground_truth * opt.d
# Create X, Y for storing generated samples
X = np.zeros((opt.sample_size,opt.d))
Y = np.zeros((opt.sample_size,opt.d))
# Generate samples of random variable X,Y and XY
for j in range(opt.d):
# In each iteration, mg.data will generate samples of two dimensions where one dimension for X and another for Y respectively
data = mg.data
X[:,j] = data[:,0]
Y[:,j] = data[:,1]
X = torch.Tensor(X)
Y = torch.Tensor(Y)
XY = torch.cat((X, Y), dim=1)
return XY, X, Y, mi
Define the neural network.
class Net(nn.Module):
# Inner class that defines the neural network architecture
def __init__(self, input_size=2, hidden_size=100, sigma=0.02):
super().__init__()
self.fc1 = nn.Linear(input_size, hidden_size)
self.fc2 = nn.Linear(hidden_size, hidden_size)
self.fc3 = nn.Linear(hidden_size, 1)
nn.init.normal_(self.fc1.weight, std=sigma)
nn.init.constant_(self.fc1.bias, 0)
nn.init.normal_(self.fc2.weight, std=sigma)
nn.init.constant_(self.fc2.bias, 0)
nn.init.normal_(self.fc3.weight, std=sigma)
nn.init.constant_(self.fc3.bias, 0)
def forward(self, input):
output = F.elu(self.fc1(input))
output = F.elu(self.fc2(output))
output = F.sigmoid(self.fc3(output))
return output
The function _resample
below is for resampling the given data samples for training the neural network.
def _resample(data, batch_size, replace=False):
# Resample the given data sample.
index = np.random.choice(
range(data.shape[0]), size=batch_size, replace=replace)
batch = data[index]
return batch
The neural network discriminator
below implements and outputs \(p_{{C} | \tilde{X}\tilde{Y}}(1|x,y)\) before clipping:
discriminator = Net(input_size=opt.d*2, hidden_size=100)
# move NN model GPU if GPU is available
if cuda:
discriminator.cuda()
# Adam opetimizer
optimizer_D = torch.optim.Adam(discriminator.parameters(), lr=opt.lr, betas=(opt.b1, opt.b2))
Load previous results.
load_available = True # set to False to prevent loading previous results
if load_available and os.path.exists(chkpt_name):
checkpoint = torch.load(
chkpt_name, map_location='cuda' if torch.cuda.is_available() else 'cpu')
mi_list = checkpoint['mi_list']
model_state = checkpoint['model_state']
discriminator.load_state_dict(model_state)
print('Previous results loaded.')
else:
mi_list = [] # storing the mi estimation of each iteration
Previous results loaded.
Training¶
The estimate of MI given by Probabilistic Classifier Method (PCM) is as follows: \begin{align} I(X, Y) = E \left[ \ln r(X, Y) \right], \end{align} where \begin{align} r(X, Y) := \frac{p_{C}(0)}{p_{C}(1)} \cdot \frac{p_{{C} | \tilde{X}\tilde{Y}} (1|x,y)}{p_{{C} | \tilde{X}\tilde{Y}} (0|x,y)}. \end{align}
To obtain MI estimate of given samples by Probabilistic Classifier Method (PCM), we train the neural network \(\hat{p}_{\theta}\) to implement \(p_{{C} | \tilde{X}\tilde{Y}}(1|x,y)\) by minimizing the cross-entropy of \(\hat{p}_{\theta}\) relative to \(p_{{C} | \tilde{X}\tilde{Y}}\).
The output of the neural network \(\hat{p}_{\theta}\) may be clipped to a certain range before used for calculating MI estimate.
The following cell train the neural network using the data samples.
XY, X, Y, Ground_truth = generate_data(distribution=density, rho=opt.rho)
continue_train = False # set to True to continue to train
if continue_train:
for i in range(opt.n_iters):
data_joint = _resample(XY, batch_size=opt.batch_size)
X_ref = resample(X, batch_size=opt.batch_size)
Y_ref = resample(Y, batch_size=opt.batch_size)
data_margin = torch.cat((X_ref, Y_ref), dim=1)
valid = Variable(torch.Tensor(opt.batch_size, 1).fill_(1.0), requires_grad=False)
fake = Variable(torch.Tensor(opt.batch_size, 1).fill_(0.0), requires_grad=False)
train_data = torch.cat((data_joint, data_margin), dim=0)
labels = torch.cat((valid, fake), dim=0)
pred_label = discriminator(train_data)
alpha = data_margin.shape[0]/data_joint.shape[0]
optimizer_D.zero_grad()
# PCM loss
loss = torch.nn.BCELoss()(pred_label, labels)
loss.backward()
optimizer_D.step()
with torch.no_grad():
mi_est = mi_estimate(discriminator, XY, opt.gamma, alpha)
if torch.isinf(mi_est):
mi_est = torch.from_numpy(np.array(mi_list[-1]))
mi_list.append(mi_est.item())
writer.add_scalar('mi_list', mi_est.item(), i)
writer.add_scalar('loss', loss, i)
if i%500==0:
print("Iternation: %d, loss: %f, mi_est: %f"%(i, loss.item(), mi_est))
writer.add_graph(discriminator, (XY,))
MI estimates¶
Use moving average to smooth the estimated MI in each training iteration.
ma_rate = 0.01
mi_copy = mi_list.copy()
for k in range(1,len(mi_list)):
mi_copy[k] = (1-ma_rate) * mi_copy[k-1] + ma_rate * mi_copy[k]
Plot the MI estimation curve against the training iteration, together the ground truth.
plt.plot(mi_copy, label='MI estimate')
plt.axhline(Ground_truth,label='ground truth',linestyle='--',color='red')
for t in range(len(mi_copy)):
if (mi_copy[t]>.8*Ground_truth):
plt.axvline(t,label='80% reached',linestyle=':',color='green')
break
plt.xlabel('number of iterations')
plt.ylabel('MI estimation')
plt.legend()
<matplotlib.legend.Legend at 0x7f07985bbe90>
Save the model.
overwrite = True # set to True to overwrite previously stored results
if overwrite or not os.path.exists(chkpt_name):
model_state = discriminator.state_dict()
torch.save({
'mi_list': mi_list,
'mi_copy': mi_copy,
'model_state': model_state
}, chkpt_name)
writer.close()
print('Current results saved.')
Current results saved.