Source code for autoqild.mi_estimators.pc_softmax_estimator

"""MI estimator that uses probability-corrected softmax functions to assess the
information content in classification scenarios."""

import logging
import math

import numpy as np
import torch
from torch import nn
from torch.utils.data import TensorDataset, DataLoader

from autoqild.mi_estimators.mi_base_class import MIEstimatorBase
from .neural_networks_torch import ClassNet
from .pytorch_utils import get_optimizer_and_parameters, init, own_softmax


[docs] class PCSoftmaxMIEstimator(MIEstimatorBase): """PCSoftmaxMIEstimator estimates Mutual Information (MI) using a neural network trained with a modified softmax function. This class uses a neural network to estimate the MI between input features and class labels. The neural network is trained using a custom softmax function that accounts for label proportions, which can help in handling imbalanced data. Parameters ---------- n_classes : int Number of classes in the classification task. n_features : int Number of features or dimensionality of the input data. n_hidden : int, optional, default=10 Number of hidden layers in the neural network. n_units : int, optional, default=100 Number of units in each hidden layer. loss_function : torch.nn.Module, optional, default=torch.nn.NLLLoss() Loss function to be used during training. optimizer_str : {`RMSprop`, `sgd`, `adam`, `AdamW`, `Adagrad`, `Adamax`, `Adadelta`}, default=`adam` Optimizer type to use for training the neural network. Must be one of: - `RMSprop`: Root Mean Square Propagation, an adaptive learning rate method. - `sgd`: Stochastic Gradient Descent, a simple and widely-used optimizer. - "adam": Adaptive Moment Estimation, combining momentum and RMSProp for better convergence. - `AdamW`: Adam with weight decay, an improved variant of Adam with better regularization. - `Adagrad`: Adaptive Gradient Algorithm, adjusting the learning rate based on feature frequency. - `Adamax`: Variant of Adam based on infinity norm, more robust with sparse gradients. - `Adadelta`: An extension of Adagrad that seeks to reduce its aggressive learning rate decay. learning_rate : float, optional, default=0.001 Learning rate for the optimizer. reg_strength : float, optional, default=0.001 Regularization strength for the optimizer. is_pc_softmax : bool, optional, default=False If True, use the custom softmax function that accounts for label proportions. random_state : int, optional, default=42 Seed for random number generation to ensure reproducibility. Attributes ---------- logger : logging.Logger Logger for logging messages and errors. optimizer : torch.optim.Optimizer Optimizer used for training the neural network. class_net : ClassNet Instance of the neural network used for classification. dataset_properties : list Proportions of each class in the dataset. final_loss : float Final loss value after training. mi_val : float Estimated mutual information after training. device : torch.device Device used for computation (CPU or GPU). """ def __init__( self, n_classes, n_features, n_hidden=10, n_units=100, loss_function=nn.NLLLoss(), optimizer_str="adam", learning_rate=0.001, reg_strength=0.001, is_pc_softmax=False, random_state=42, ): super().__init__(n_classes=n_classes, n_features=n_features, random_state=random_state) self.logger = logging.getLogger(PCSoftmaxMIEstimator.__name__) self.optimizer_str = optimizer_str self.learning_rate = learning_rate self.reg_strength = reg_strength self.optimizer_cls, self._optimizer_config = get_optimizer_and_parameters( optimizer_str, learning_rate, reg_strength ) self.is_pc_softmax = is_pc_softmax self.n_hidden = n_hidden self.n_units = n_units self.loss_function = loss_function self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu") self.optimizer = None self.class_net = None self.dataset_properties = None self.final_loss = 0 self.mi_val = 0 def __pytorch_tensor_dataset__(self, X, y, batch_size=32): """Create a PyTorch dataset and data loader from the input data. Parameters ---------- X : array-like of shape (n_samples, n_features) Feature matrix. y : array-like of shape (n_samples,) Target labels. batch_size : int, optional, default=32 Number of samples per batch. Returns ------- dataset_prop : list Proportions of each class in the dataset. tra_dataloader : torch.utils.data.DataLoader DataLoader for the training data. """ y_l, counts = np.unique(y, return_counts=True) total = len(y) dataset_prop = [x / total for x in counts] tensor_x = torch.tensor(X, dtype=torch.float32).to(self.device) # transform to torch tensor tensor_y = torch.tensor(y, dtype=torch.int64).to(self.device) my_dataset = TensorDataset(tensor_x, tensor_y) # create your dataset tra_dataloader = DataLoader( my_dataset, num_workers=1, batch_size=batch_size, shuffle=True, drop_last=False, pin_memory=True, ) return dataset_prop, tra_dataloader
[docs] def fit(self, X, y, epochs=50, verbose=0, **kwd): """Fit the neural network to the data. Parameters ---------- X : array-like of shape (n_samples, n_features) Training data. y : array-like of shape (n_samples,) Target labels. epochs : int, optional, default=50 Number of training epochs. verbose : int, optional, default=0 Verbosity level. **kwd : dict, optional Additional keyword arguments. Returns ------- self : PCSoftmaxMIEstimator Fitted estimator. """ self.class_net = ClassNet( in_dim=self.n_features, out_dim=self.n_classes, n_hidden=self.n_hidden, n_units=self.n_units, device=self.device, is_pc_softmax=self.is_pc_softmax, ) self.class_net.apply(init) self.class_net.to(self.device) self.optimizer = self.optimizer_cls(self.class_net.parameters(), **self._optimizer_config) dataset_prop, tra_dataloader = self.__pytorch_tensor_dataset__(X, y) self.dataset_properties = dataset_prop self.final_loss = 0 for epoch in range(1, epochs + 1): correct = 0 running_loss = 0.0 sum_loss = 0 for ite_idx, (tensor_x, tensor_y) in enumerate(tra_dataloader): tensor_x = tensor_x.to(self.device) tensor_y = tensor_y.to(self.device).squeeze() preds_ = self.class_net(tensor_x, dataset_prop) loss = self.loss_function(preds_, tensor_y) loss.backward() self.optimizer.step() sum_loss += loss running_loss += loss.item() self.final_loss += float(loss.detach().numpy()) if verbose and epoch % 10 == 0: _, predicted = torch.max(preds_, 1) correct += (predicted == tensor_y).sum().item() accuracy = 100 * correct / tensor_y.size(0) print(f"For Epoch: {epoch} Running loss: {running_loss} Accuracy: {accuracy} %") self.logger.error( f"For Epoch: {epoch} Running loss: {running_loss} Accuracy: {accuracy} %" ) self.mi_val = self.estimate_mi(X, y, verbose=0) self.logger.info(f"Fit Loss {self.final_loss} MI Val: {self.mi_val}") return self
[docs] def predict(self, X, verbose=0): """Predict class labels for the input samples. Parameters ---------- X : array-like of shape (n_samples, n_features) Feature matrix. verbose : int, optional, default=0 Verbosity level. Returns ------- y_pred : array-like of shape (n_samples,) Predicted class labels. """ y = np.random.choice(self.n_classes, X.shape[0]) dataset_prop, test_dataloader = self.__pytorch_tensor_dataset__(X, y, batch_size=X.shape[0]) for ite_idx, (a_data, a_label) in enumerate(test_dataloader): a_data = a_data.to(self.device) a_label = a_label.to(self.device).squeeze() test_ = self.class_net(a_data, dataset_prop) _, predicted = torch.max(test_, 1) y_pred = predicted.detach().numpy() return y_pred
[docs] def score(self, X, y, sample_weight=None, verbose=0): """Compute the score of the neural network. Parameters ---------- X : array-like of shape (n_samples, n_features) Feature matrix. y : array-like of shape (n_samples,) True labels for "X". sample_weight : array-like of shape (n_samples,), optional Sample weights. verbose : int, optional, default=0 Verbosity level. Returns ------- score : float Negative loss of the model on the validation data. """ y_pred = self.predict(X, verbose=0) acc = np.mean(y == y_pred) if np.isnan(self.final_loss) or np.isinf(self.final_loss): acc = 0.0 s_pred = self.predict_proba(X, verbose=0) pyx = ((s_pred * np.log2(s_pred)).sum(axis=1)).mean() dataset_prop, test_dataloader = self.__pytorch_tensor_dataset__(X, y, batch_size=X.shape[0]) val_loss = 0 for ite_idx, (a_data, a_label) in enumerate(test_dataloader): a_data = a_data.to(self.device) preds_ = self.class_net(a_data, dataset_prop) a_label = a_label.to(self.device).squeeze() loss = self.loss_function(preds_, a_label) val_loss += loss self.logger.info( f"Loss {self.final_loss} Accuracy {acc} pyx {pyx} MI {self.mi_val} Val loss {val_loss}" ) return -val_loss
[docs] def predict_proba(self, X, verbose=0): """Predict class probabilities for the input samples. Parameters ---------- X : array-like of shape (n_samples, n_features) Feature matrix. verbose : int, optional, default=0 Verbosity level. Returns ------- p_pred : array-like of shape (n_samples, n_classes) Predicted class probabilities. """ y = np.random.choice(self.n_classes, X.shape[0]) dataset_prop, test_dataloader = self.__pytorch_tensor_dataset__(X, y, batch_size=X.shape[0]) for ite_idx, (a_data, a_label) in enumerate(test_dataloader): a_data = a_data.to(self.device) test_ = self.class_net.score(a_data, dataset_prop) p_pred = test_.detach().numpy() return p_pred
[docs] def decision_function(self, X, verbose=0): """Compute the decision function in form of class probabilities for the input samples. Parameters ---------- X : array-like of shape (n_samples, n_features) Feature matrix. verbose : int, optional, default=0 Verbosity level. Returns ------- scores : array-like of shape (n_samples, n_classes) Decision function values. """ y = np.random.choice(self.n_classes, X.shape[0]) test_ = None dataset_prop, test_dataloader = self.__pytorch_tensor_dataset__(X, y, batch_size=X.shape[0]) for ite_idx, (a_data, a_label) in enumerate(test_dataloader): a_data = a_data.to(self.device) test_ = self.class_net.score(a_data, dataset_prop) if test_ is not None: scores = test_.detach().numpy() else: n_samples, n_classes = X.shape[0], X.shape[1] scores = np.zeros(n_samples) + 1 / n_classes return scores
[docs] def estimate_mi(self, X, y, verbose=1, **kwargs): """Estimate Mutual Information using the trained neural network using the Softmax and PC-Softmax loss functions. .. math:: I(X;Y) = H(Y) - H(Y|X) Softmax Function: .. math:: S(z_k) = \\frac{e^{z_k}}{\\sum_{j=1}^{K} e^{z_j}} where: - \( z_k \) is the logit or raw score for class \( k \). - \( K \) is the total number of classes. PC-Softmax Function: .. math:: S_{pc}(z_k) = \\frac{e^{z_k}}{\\sum_{j=1}^{K} e^{z_j} \\cdot p_j} where: - \( z_k \) is the logit or raw score for class \( k \). - \( p_j = \\frac{\\text{counts}_j}{\\text{total samples}} \) is the prior probability of class \( j \) Parameters ---------- X : array-like of shape (n_samples, n_features) Input data. y : array-like of shape (n_samples,) Target labels. verbose : int, optional, default=1 Verbosity level. **kwargs : dict, optional Additional keyword arguments. Returns ------- mi_estimated : float The estimated mutual information. """ dataset_prop, test_dataset = self.__pytorch_tensor_dataset__(X, y, batch_size=1) softmax_list = [] for a_data, a_label in test_dataset: int_label = a_label.cpu().item() a_data = a_data.unsqueeze(0).to(self.device) test_ = self.class_net(a_data, dataset_prop) if self.is_pc_softmax: a_softmax = torch.flatten(own_softmax(test_, dataset_prop, self.device))[int_label] else: a_softmax = torch.flatten(torch.softmax(test_, dim=-1))[int_label] if self.is_pc_softmax: softmax_list.append(math.log2(a_softmax.cpu().item())) else: softmax_list.append( math.log2(a_softmax.cpu().item()) + math.log2(len(dataset_prop)) ) mi_estimated = np.nanmean(softmax_list) if np.isnan(mi_estimated) or np.isinf(mi_estimated): mi_estimated = 0 if self.mi_val - mi_estimated > 0.01: mi_estimated = self.mi_val mi_estimated = np.max([mi_estimated, 0.0]) return mi_estimated