Source code for autoqild.detectors.mi_estimator_detector

"""Detects leakage by estimating mutual information using GMM or MINE
estimators."""

from .sklearn_leakage_detector import SklearnLeakageDetector
from ..mi_estimators import GMMMIEstimator, MineMIEstimatorMSE
from ..utilities import *

__all__ = ["MIEstimationLeakageDetector"]


[docs] class MIEstimationLeakageDetector(SklearnLeakageDetector): """MIEstimationLeakageDetector class for detecting information leakage using mutual information (MI) estimation techniques. This class extends `SklearnLeakageDetector` to detect information leakage in machine learning experiments using mutual information estimation techniques. The class supports two primary MI estimation methods: MINE (Mutual Information Neural Estimator) and GMM (Gaussian Mixture Model). The selected MI estimation technique is used as the base detector for leakage analysis. Parameters ---------- mi_technique : str The MI estimation technique to use. Options include: - `mine_mi_estimator`: Uses MINE model to estimate mutual information. - `gmm_mi_estimator`: Uses GMM model to estimate mutual information. padding_name : str The name of the padding method used in the experiments to obscure or detect leakage. learner_params : dict Parameters related to the machine learning models (learners) used in the detection process. fit_params : dict Parameters passed to the `fit` method during model training. hash_value : str A unique hash value used to identify and manage result files for a specific experiment. cv_iterations : int The number of cross-validation iterations to perform during model evaluation. n_hypothesis : int The number of hypotheses or models to be tested for leakage. base_directory : str The base directory where result files, logs, and backups are stored. search_space : dict The hyperparameter search space for Bayesian optimization. hp_iters : int The number of iterations for hyperparameter optimization. n_inner_folds : int The number of folds for inner cross-validation during hyperparameter optimization. validation_loss : str The loss function used to evaluate the performance of models during cross-validation. random_state : int or RandomState instance, optional Controls the randomness for reproducibility, ensuring consistent results across different runs. **kwargs : dict, optional Additional keyword arguments passed to the parent class. Raises ------ ValueError If an invalid mutual information technique is specified, or if the detection method is not compatible with the selected MI estimator. Notes ----- This class supports only the one-sample t-test for the detection method when using mutual information estimation. Attempting to use nother detection method will result in a `ValueError` being raised. """ def __init__( self, mi_technique, padding_name, learner_params, fit_params, hash_value, cv_iterations, n_hypothesis, base_directory, search_space, hp_iters, n_inner_folds, validation_loss, random_state=None, **kwargs, ): super().__init__( padding_name=padding_name, learner_params=learner_params, fit_params=fit_params, hash_value=hash_value, cv_iterations=cv_iterations, n_hypothesis=n_hypothesis, base_directory=base_directory, search_space=search_space, hp_iters=hp_iters, n_inner_folds=n_inner_folds, validation_loss=validation_loss, random_state=random_state, **kwargs, ) if mi_technique == MINE_MI_ESTIMATOR: self.base_detector = MineMIEstimatorMSE self.n_jobs = 1 elif mi_technique == GMM_MI_ESTIMATOR: self.base_detector = GMMMIEstimator self.n_jobs = 8 else: raise ValueError(f"Invalid mutual information technique: {mi_technique}") if self.detection_method != ESTIMATED_MUTUAL_INFORMATION: raise ValueError( "Only the one-sample t-test based detection method is compatible with mutual information estimation." ) def __initialize_objects__(self): """Initializes the results dictionary for storing metric results. This method sets up the internal results dictionary, organizing it by hypothesis models and metrics. Each model’s metric scores are prepared for storage, along with the majority voting and random classifier baselines. Notes ----- This method is intended for internal use only and is automatically called during initialization. """ for i in range(self.n_hypothesis): self.results[f"model_{i}"] = {} self.results[f"model_{i}"][ESTIMATED_MUTUAL_INFORMATION] = []
[docs] def hyperparameter_optimization(self, X, y): """Performs Bayesian hyperparameter optimization to identify the best model parameters. This method uses a Bayesian search strategy to explore a predefined hyperparameter search space and selects the optimal configuration based on the specified validation loss. The method performs cross-validation within the search to ensure that the selected hyperparameters generalize well. Parameters ---------- X : array-like of shape (n_samples, n_features) The input data to be used for training during hyperparameter optimization. y : array-like of shape (n_samples,) The target values (class labels) corresponding to X. Returns ------- int The size of the training dataset after reduction (if applicable). Raises ------ Exception If an error occurs during the Bayesian search fitting process. """ return super().hyperparameter_optimization(X, y)
[docs] def fit(self, X, y): """Fits the model using cross-validation and performs hyperparameter optimization. This method first checks if the model has already been fitted. If not, it runs the hyperparameter optimization process followed by cross-validation on the specified number of hypotheses. The model is trained using a stratified split of the dataset, and results are evaluated using predefined metrics. Parameters ---------- X : array-like of shape (n_samples, n_features) The input data used for training the models. y : array-like of shape (n_samples,) The target values (class labels) corresponding to X. Notes ----- During fitting, random classifier and majority voting classifier performance is also calculated for comparison. """ if self._is_fitted_: self.logger.info(f"Model already fitted for the padding {self.padding_code}") else: train_size = self.hyperparameter_optimization(X, y) for i in range(self.n_hypothesis): loss, learner_params = self.estimators[i] self.logger.info(f"********** Model {i + 1} with loss {loss} **********") self.logger.info(f"Parameters {print_dictionary(learner_params)}") model = self.base_detector(**learner_params) for k, (train_index, test_index) in enumerate(self.cv_iterator.split(X, y)): train_index = train_index[:train_size] X_train, X_test = X[train_index], X[test_index] y_train, y_test = y[train_index], y[test_index] model.fit(X=X_train, y=y_train) self.logger.info( f"************************* Split {k + 1} **************************" ) metric_loss = model.estimate_mi(X, y) self.logger.info(f"Metric {ESTIMATED_MUTUAL_INFORMATION}: Value {metric_loss}") model_name = list(self.results.keys())[i] self.results[model_name][ESTIMATED_MUTUAL_INFORMATION].append(metric_loss) self.__store_results__()
[docs] def detect(self, detection_method=None): """Executes the detection process to identify potential information leakage using statistical tests. The method applies various statistical techniques, such as paired t-tests and Fisher’s exact test, to detect significant differences in model performance that may indicate information leakage. The decision is made based on the results of these tests, accounting for multiple hypothesis corrections. Parameters ---------- detection_method : str The method to use for detecting information leakage. Options include: - `estimated_mutual_information`: Estimates mutual information to detect leakage. Returns ------- detection_decision : bool Indicates whether any models showed significant leakage. hypothesis_rejected : int The number of models flagged for leakage. Notes ----- The method implements a Holm-Bonferroni correction to control the family-wise error rate for multiple models. """ return super().detect(detection_method="estimated_mutual_information")