"""Generates synthetic datasets with introducing noise by flipping certain
percentage of labels for testing and evaluating machine learning models."""
import logging
from abc import ABCMeta
import numpy as np
from scipy.stats import multivariate_normal
from scipy.stats import ortho_group
from sklearn.utils import check_random_state, shuffle
from .utils import FACTOR, pdf
from ..utilities import *
__all__ = ["SyntheticDatasetGenerator"]
[docs]
class SyntheticDatasetGenerator(metaclass=ABCMeta):
"""Generator for synthetic datasets with a focus on generating data with
varying class distances.
This class generates synthetic datasets by adjusting the distance between class distributions, allowing
for the simulation of scenarios with varying levels of overlap between classes. It is designed to help
in testing classifiers on datasets with controlled class separability.
Parameters
----------
n_classes : int, default=2
Number of classes in the generated dataset.
n_features : int, default=2
Number of features in the generated dataset.
samples_per_class : int or dict, default=500
Number of samples per class. If an integer is provided, it is assumed that all classes have the same
number of samples. If a dictionary is provided, the keys should be class labels and values should be
the number of samples for each class.
flip_y : float, default=0.1
The fraction of samples whose class labels will be randomly flipped to simulate noise.
random_state : int or RandomState instance, default=42
Random state for reproducibility.
fold_id : int, default=0
Fold ID used for random seed generation.
imbalance : float, default=0.0
Proportion of the minority class in the dataset. Must be between 0 and 1.
gen_type : str, default=`single`
Type of generation process. It can be used to modify the dataset generation method.
**kwargs : dict
Additional keyword arguments.
Attributes
----------
n_classes : int
Number of classes in the generated dataset.
n_features : int
Number of features in the generated dataset.
random_state : RandomState instance
Random state instance for reproducibility.
fold_id : int
Fold ID used for random seed generation.
means : dict
Dictionary storing the mean vectors for each class.
covariances : dict
Dictionary storing the covariance matrices for each class.
seeds : dict
Dictionary storing the random seeds used for generating each class.
samples_per_class : dict
Dictionary storing the number of samples for each class.
imbalance : float
Proportion of the minority class in the dataset.
gen_type : str
Type of generation process.
n_instances : int
Total number of instances in the generated dataset.
class_labels : numpy.ndarray
Array of class labels.
y_prob : dict
Dictionary storing the probability of each class.
ent_y : float or None
Entropy of the class distribution.
flip_y_prob : dict
Dictionary storing the probability of flipped class labels for each class.
flip_y : float
The fraction of samples whose class labels will be randomly flipped to simulate noise.
logger : logging.Logger
Logger instance for logging information.
Private Methods
---------------
__generate_cov_means__():
Generate the mean vectors and covariance matrices for each class.
This method creates a random orthogonal matrix and generates a positive semi-definite covariance matrix.
It then calculates the mean vector for each class.
"""
def __init__(
self,
n_classes=2,
n_features=2,
samples_per_class=500,
flip_y=0.1,
random_state=42,
fold_id=0,
imbalance=0.0,
gen_type="single",
**kwargs,
):
self.n_classes = n_classes
self.n_features = n_features
self.random_state = check_random_state(random_state)
self.fold_id = fold_id
self.means = {}
self.covariances = {}
self.seeds = {}
if isinstance(samples_per_class, int):
self.samples_per_class = dict.fromkeys(np.arange(n_classes), samples_per_class)
elif isinstance(samples_per_class, dict):
self.samples_per_class = {}
for key in samples_per_class.keys():
self.samples_per_class[int(key)] = samples_per_class.get(key)
else:
raise ValueError("Samples per class is not defined properly")
self.imbalance = imbalance
self.gen_type = gen_type
self.n_instances = sum(self.samples_per_class.values())
self.class_labels = np.arange(self.n_classes)
self.y_prob = {}
self.ent_y = None
self.flip_y_prob = {}
self.flip_y = flip_y
self.__generate_cov_means__()
self.logger = logging.getLogger(SyntheticDatasetGenerator.__name__)
[docs]
def __generate_cov_means__(self):
seed = self.random_state.randint(2**31, dtype="uint32") + self.fold_id
rs = np.random.RandomState(seed=seed)
Q = ortho_group.rvs(dim=self.n_features)
S = np.diag(np.diag(rs.rand(self.n_features, self.n_features)))
cov = np.dot(np.dot(Q, S), np.transpose(Q))
for k_class in self.class_labels:
# A = rs.rand(n_features, n_features)
# matrix1 = np.matmul(A, A.transpose())
# positive semi-definite matrix
seed = self.random_state.randint(2**31, dtype="uint32") + self.fold_id
mean = np.ones(self.n_features) + (k_class * FACTOR)
self.means[k_class] = mean
self.covariances[k_class] = cov
self.seeds[k_class] = seed
self.y_prob[k_class] = self.samples_per_class[k_class] / self.n_instances
self.flip_y_prob[k_class] = (
self.y_prob[k_class] * (1 - self.flip_y) + self.flip_y / self.n_classes
)
self.flip_y_prob[k_class] = np.around(self.flip_y_prob[k_class], 4)
# print(self.y_prob)
# print(self.flip_y_prob)
[docs]
def get_prob_dist_x_given_y(self, k_class):
"""Get the multivariate normal distribution for a given class.
Parameters
----------
k_class : int
The class label for which to get the distribution.
Returns
-------
scipy.stats._multivariate.multivariate_normal_frozen
The multivariate normal distribution for the given class.
"""
return multivariate_normal(
mean=self.means[k_class],
cov=self.covariances[k_class],
seed=self.seeds[k_class],
)
[docs]
def get_prob_fn_margx(self):
"""Get the marginal probability distribution function for the input
data.
Returns
-------
marg_x: function
A function that computes the marginal probability for the input data.
"""
marg_x = lambda x: np.array(
[
self.y_prob[k_class] * pdf(self.get_prob_dist_x_given_y(k_class), x)
for k_class in self.class_labels
]
)
return marg_x
[docs]
def get_prob_x_given_y(self, X, class_label):
"""Get the probability of X given a specific class label.
Parameters
----------
X : array-like of shape (n_samples, n_features)
Input data.
class_label : int
The class label for which to compute the probability.
Returns
-------
prob_x_given_y: array-like
The probability of X given the class label.
"""
dist = self.get_prob_dist_x_given_y(class_label)
prob_x_given_y = pdf(dist, X)
return prob_x_given_y
[docs]
def get_prob_y_given_x(self, X, class_label):
"""Get the probability of a class label given the input data X.
Parameters
----------
X : array-like of shape (n_samples, n_features)
Input data.
class_label : int
The class label for which to compute the probability.
Returns
-------
prob_y_given_x: array-like
The probability of the class label given the input data X.
"""
pdf_xy = lambda x, k_class: self.y_prob[k_class] * pdf(
self.get_prob_dist_x_given_y(k_class), x
)
marg_x = self.get_prob_fn_margx()
x_marg = marg_x(X).sum(axis=0)
prob_y_given_x = pdf_xy(X, class_label) / x_marg
return prob_y_given_x
[docs]
def get_prob_flip_y_given_x(self, X, class_label):
"""Get the probability of a flipped class label given the input data X.
Parameters
----------
X : array-like of shape (n_samples, n_features)
Input data.
class_label : int
The class label for which to compute the probability.
Returns
-------
prob_y_given_x: array-like
The probability of a flipped class label given the input data X.
"""
first_term = (1 - self.flip_y) * self.get_prob_y_given_x(X, class_label)
# second_term = (self.flip_y / self.n_classes)
second_term = self.flip_y * self.y_prob[class_label]
prob_y_given_x = first_term + second_term
return prob_y_given_x
[docs]
def get_prob_x_given_flip_y(self, X, class_label):
"""Get the probability of the input data X given a flipped class label.
Parameters
----------
X : array-like of shape (n_samples, n_features)
Input data.
class_label : int
The flipped class label for which to compute the probability.
Returns
-------
prob_x_given_flip_y: array-like
The probability of the input data X given a flipped class label.
"""
prob_flip_y_given_x = self.get_prob_flip_y_given_x(X, class_label)
marg_x = self.get_prob_fn_margx()
x_marg = marg_x(X).sum(axis=0)
prob_x_given_flip_y = prob_flip_y_given_x * x_marg
return prob_x_given_flip_y
[docs]
def generate_samples_for_class(self, k_class):
"""Generate synthetic samples for a specific class.
Parameters
----------
k_class : int
The class label for which to generate samples.
Returns
-------
data: array-like
A tuple containing the generated features.
labels: array-like
A list of labels corresponding to the features.
"""
seed = self.random_state.randint(2**32, dtype="uint32")
mvn = self.get_prob_dist_x_given_y(k_class)
n_samples = self.samples_per_class[k_class]
data = mvn.rvs(n_samples, random_state=seed)
labels = np.zeros(n_samples, dtype="int64") + k_class
return data, labels
[docs]
def generate_dataset(self):
"""Generate the full synthetic dataset.
Returns
-------
X : array-like of shape (n_samples, n_features)
Feature matrix after applying sampling to create imbalance.
y : array-like of shape (n_samples,)
Target vector after applying sampling to create imbalance.
"""
X = []
y = []
for k_class in self.class_labels:
data, labels = self.generate_samples_for_class(k_class)
if len(X) == 0:
X = data
y = labels
else:
X = np.vstack((X, data))
y = np.append(y, labels)
if self.flip_y > 0:
if len(np.unique(list(self.samples_per_class.values()))) == 1:
self.logger.info("################# Balanced Dataset #################")
indices = []
for k_class in self.class_labels:
flip_samples = int(self.flip_y * self.samples_per_class[k_class])
ind0 = list(
self.random_state.choice(
np.where(y == k_class)[0], flip_samples, replace=False
)
)
indices.extend(ind0)
uni, counts = np.unique(y, return_counts=True)
d = {i: c for i, c in zip(uni, counts)}
self.logger.info(f"Original {d}")
self.logger.info(f"Probs {self.y_prob}")
y[indices] = shuffle(y[indices], random_state=self.random_state)
uni, counts = np.unique(y, return_counts=True)
d = {i: c for i, c in zip(list(uni), counts)}
d_o = {i: c for i, c in zip(list(uni), counts / np.sum(counts))}
self.logger.info(f"After Flipping {d}")
self.logger.info(f"Flipping Ratio {d_o}")
else:
self.logger.info("################# Imbalanced Dataset #################")
uni, counts = np.unique(y, return_counts=True)
d = {i: c for i, c in zip(uni, counts)}
self.logger.info(f"Original {d}")
self.logger.info(f"Probs {self.y_prob}")
choices = []
indices = []
for i, y_i in enumerate(y):
choice = self.random_state.choice(2, 1, p=[1 - self.flip_y, self.flip_y])
choices.append(choice)
if choice == 1:
indices.append(i)
# y[i] = self.random_state.choice(self.n_classes, 1)
p = np.array(
[
self.samples_per_class[int(k)] / self.n_instances
for k in range(self.n_classes)
]
)
y_old = np.copy(y)
y[indices] = self.random_state.choice(self.n_classes, size=len(indices), p=p)
# y[indices] = self.random_state.randint(self.n_classes, size=len(indices))
self.logger.info(f"Actual Flip {self.flip_y} Flips {np.mean(y_old != y)}")
self.logger.info(f"Chosen flips {np.mean(choices)}")
uni, counts = np.unique(y, return_counts=True)
d = {i: c for i, c in zip(list(uni), counts)}
d_o = {i: c for i, c in zip(list(uni), counts / np.sum(counts))}
self.logger.info(f"After Flipping {d}")
self.logger.info(f"Flipping Ratio {d_o}")
self.logger.info(f"Y_prob {self.y_prob}")
self.logger.info(f"Probs after flipping {self.flip_y_prob}")
return X, y
[docs]
def entropy_y(self, y):
"""Calculate the entropy of the class distribution in the dataset.
Parameters
----------
y : array-like of shape (n_samples,)
The labels of the dataset.
Returns
-------
mi_pp: float
The entropy of the class distribution.
"""
uni, counts = np.unique(y, return_counts=True)
y_pred = counts / np.sum(counts)
y_pred = {i: c for i, c in zip(list(uni), y_pred)}
mi_pp = 0
for k_class in self.class_labels:
mi_pp += -self.y_prob[k_class] * np.log2(self.y_prob[k_class])
self.logger.info(f"{k_class}: {y_pred[k_class]}, {self.y_prob[k_class]}")
return mi_pp
[docs]
def calculate_mi(self):
"""Calculate the mutual information (MI) using the probability
distribution function using the formulae below.
.. math::
I(X;Y) = H(X) - H(X|Y)
Returns
-------
mutual_information : float
The mutual information of the dataset.
"""
x_y_prob_list = []
for k_class in self.class_labels:
prob_list = -1
nter = 0
while prob_list < 0:
X, y = self.generate_dataset()
ind = np.where(y == k_class)[0]
data = X[ind, :]
if self.flip_y == 0.0:
x_y_prob = self.get_prob_x_given_y(X=data, class_label=k_class)
marg_x = self.get_prob_fn_margx()
p_x_marg = marg_x(data).sum(axis=0)
a_log_x_prob = x_y_prob / p_x_marg
else:
x_y_prob = self.get_prob_flip_y_given_x(X=data, class_label=k_class)
a_log_x_prob = x_y_prob / self.y_prob[k_class]
# print(x_y_prob)
prob_list = np.nanmean(np.log2(a_log_x_prob))
# print(prob_list)
nter = nter + 1
if nter >= 100:
break
if prob_list < 0:
prob_list = -1 * prob_list
if self.flip_y == 0.0:
x_y_prob_list.append(prob_list * self.y_prob[k_class])
else:
x_y_prob_list.append(prob_list * self.y_prob[k_class])
mutual_information = np.nansum(x_y_prob_list)
return mutual_information
[docs]
def bayes_predictor_mi(self):
"""Calculate the mutual information (MI) using the probability
distribution function using the formulae below.
.. math::
I(X;Y) = H(X) - H(X|Y)
Returns
-------
mutual_information : float
The mutual information of the dataset.
"""
X, y = self.generate_dataset()
y_pred = np.zeros((X.shape[0], self.n_classes))
for k_class in self.class_labels:
if self.flip_y == 0.0:
y_pred[:, k_class] = self.get_prob_y_given_x(X=X, class_label=k_class)
else:
y_pred[:, k_class] = self.get_prob_flip_y_given_x(X=X, class_label=k_class)
y_pred[y_pred == 0] = np.finfo(float).eps
y_pred[y_pred == 1] = 1 - np.finfo(float).eps
self.logger.info(f"Sum {y_pred.sum(axis=1)}")
self.logger.info(f"Sum {y_pred.sum()}, n_instances {self.n_instances}")
# self.logger.info(f"Y_pred {np.around(y_pred[0:3, :], 4)}")
pyx = (y_pred * np.log2(y_pred)).sum(axis=1)
mi_bp = pyx.mean()
self.ent_y = self.entropy_y(y)
mutual_information = mi_bp + self.ent_y
self.logger.info(f"mi_bp {mi_bp} mi_pp {self.ent_y}")
mutual_information = np.max([mutual_information, 0.0])
return mutual_information
[docs]
def bayes_predictor_pc_softmax_mi(self):
"""Calculate the mutual information (MI) using class probabilities
derived from the PDF of a class label given the input data X, applying
both the Softmax and PC-Softmax functions.
.. math::
I(X;Y) = H(Y) - H(Y|X)
Softmax Function:
.. math::
S(z_k) = \\frac{e^{z_k}}{\\sum_{j=1}^{K} e^{z_j}}
where:
- \( z_k \) is the logit or raw score for class \( k \).
- \( K \) is the total number of classes.
PC-Softmax Function:
.. math::
S_{pc}(z_k) = \\frac{e^{z_k}}{\\sum_{j=1}^{K} e^{z_j} \\cdot p_j}
where:
- \( z_k \) is the logit or raw score for class \( k \).
- \( p_j = \\frac{\\text{counts}_j}{\\text{total samples}} \) is the prior probability of class \( j \)
Returns
-------
softmax_emi : float
Estimated softmax mutual information.
pc_softmax_emi : float
Estimated PC-softmax mutual information.
"""
X, y = self.generate_dataset()
y_pred = np.zeros((X.shape[0], self.n_classes))
for k_class in self.class_labels:
if self.flip_y == 0.0:
y_pred[:, k_class] = self.get_prob_y_given_x(X=X, class_label=k_class)
else:
y_pred[:, k_class] = self.get_prob_flip_y_given_x(X=X, class_label=k_class)
y_pred[y_pred == 0] = np.finfo(float).eps
y_pred[y_pred == 1] = 1 - np.finfo(float).eps
classes, counts = np.unique(y, return_counts=True)
pys = counts / np.sum(counts)
normal_softmaxes = softmax(y_pred)
pc_softmax_mis = []
softmax_mis = []
x_exp = np.exp(y_pred)
weighted_x_exp = x_exp * pys
# weighted_x_exp = x_exp
x_exp_sum = np.sum(weighted_x_exp, axis=1, keepdims=True)
pc_softmaxes = x_exp / x_exp_sum
for i, y_t in enumerate(y):
mi = np.log2(pc_softmaxes[i, int(y_t)])
# print("########################################################################")
# print(f"y_t {y_t} mi {mi} pc_softmaxes {pc_softmaxes[i]} y_pred {y_pred[i]}")
pc_softmax_mis.append(mi)
mi = np.log2(normal_softmaxes[i, int(y_t)]) + np.log2(self.n_classes)
softmax_mis.append(mi)
# print(pc_softmax_mis, softmax_mis)
pc_softmax_emi = np.nanmean(pc_softmax_mis)
softmax_emi = np.nanmean(softmax_mis)
return softmax_emi, pc_softmax_emi
[docs]
def get_bayes_mi(self, metric_name=MCMC_LOG_LOSS):
"""Get the estimated mutual information based on the specified metric.
Parameters
----------
metric_name : {`MCMCBayesMI`, `MCMCLogLossBayesMI`, `MCMCPCSoftmaxBayesMI`, `MCMCSoftmaxBayesMI`}, default=`MCMCLogLossBayesMI`
The name of the metric to use for MI estimation.
Must be one of:
- `MCMCLogLossBayesMI`: Estimate mutual information using the log loss of the bayes pedictor.
- `MCMCBayesMI`: Estimate mutual information using the marginal of inputs and conditionals on inputs using class labels
- `MCMCPCSoftmaxBayesMI`: Estimate mutual information using the MCMC PC Softmax Bayes method.
- `MCMCSoftmaxBayesMI`: Estimate mutual information using the MCMC Softmax Bayes method.
Returns
-------
mutual_information: float
The estimated mutual information based on the selected metric.
"""
mutual_information = 0
if metric_name == MCMC_LOG_LOSS:
mutual_information = self.bayes_predictor_mi()
if metric_name == MCMC_MI_ESTIMATION:
mutual_information = self.calculate_mi()
softmax_emi, pc_softmax_emi = self.bayes_predictor_pc_softmax_mi()
if metric_name == MCMC_PC_SOFTMAX:
mutual_information = pc_softmax_emi
if metric_name == MCMC_SOFTMAX:
mutual_information = softmax_emi
mutual_information = np.max([mutual_information, 0.0])
return mutual_information