"""A versatile leakage detection class built on top of the scikit-learn
framework, supporting multiple estimators."""
import copy
import gc
import logging
import os
import torch
from sklearn.model_selection import StratifiedShuffleSplit
from sklearn.model_selection import train_test_split
from .ild_base_class import InformationLeakageDetector
from ..automl.tabpfn_classifier import AutoTabPFNClassifier
from ..bayes_search import *
from ..utilities import *
[docs]
class SklearnLeakageDetector(InformationLeakageDetector):
"""SklearnLeakageDetector class for detecting information leakage using a
scikit-learn-based model.
This class extends the `InformationLeakageDetector` base class and incorporates hyperparameter optimization via Bayesian search,
model fitting, and cross-validation using scikit-learn models. It supports the detection of information leakage in machine learning
experiments by analyzing the modelβs behavior with various padding techniques. The class is highly configurable and works with different
search spaces, loss functions, and validation strategies.
Parameters
----------
padding_name : str
The name of the padding method used in the experiments to obscure or detect leakage.
learner_params : dict
Parameters related to the machine learning models (learners) used in the detection process.
fit_params : dict
Parameters passed to the `fit` method during model training.
hash_value : str
A unique hash value used to identify and manage result files for a specific experiment.
cv_iterations : int
The number of cross-validation iterations to perform during model evaluation.
n_hypothesis : int
The number of hypotheses or models to be tested for leakage.
base_directory : str
The base directory where result files, logs, and backups are stored.
search_space : dict
The hyperparameter search space for Bayesian optimization.
hp_iters : int
The number of iterations for hyperparameter optimization.
n_inner_folds : int
The number of folds for inner cross-validation during hyperparameter optimization.
validation_loss : str
The loss function used to evaluate the performance of models during cross-validation.
random_state : int or RandomState instance, optional
Controls the randomness for reproducibility, ensuring consistent results across different runs.
**kwargs : dict, optional
Additional keyword arguments passed to the parent class and used in model fitting.
Attributes
----------
search_space : dict
The hyperparameter search space used in Bayesian optimization.
hp_iters : int
The number of iterations for hyperparameter optimization.
n_inner_folds : int
Number of folds for inner cross-validation.
validation_loss : str
The loss function used for validation during hyperparameter tuning.
inner_cv_iterator : StratifiedShuffleSplit
Cross-validation iterator used for inner folds during hyperparameter optimization.
tabpfn_folder : str
Directory where TabPFN optimization results are saved.
n_jobs : int
Number of parallel jobs for hyperparameter search.
logger : logging.Logger
Logger instance for recording the process of leakage detection.
"""
def __init__(
self,
padding_name,
learner_params,
fit_params,
hash_value,
cv_iterations,
n_hypothesis,
base_directory,
search_space,
hp_iters,
n_inner_folds,
validation_loss,
random_state=None,
**kwargs,
):
super().__init__(
padding_name=padding_name,
learner_params=learner_params,
fit_params=fit_params,
hash_value=hash_value,
cv_iterations=cv_iterations,
n_hypothesis=n_hypothesis,
base_directory=base_directory,
random_state=random_state,
**kwargs,
)
self.search_space = search_space
self.hp_iters = hp_iters
self.n_inner_folds = n_inner_folds
self.validation_loss = validation_loss
self.inner_cv_iterator = StratifiedShuffleSplit(
n_splits=self.n_inner_folds, test_size=0.30, random_state=self.random_state
)
self.tabpfn_folder = os.path.join(
base_directory, OPTIMIZER_FOLDER, hash_value, f"{self.padding_code}.pkl"
)
create_directory_safely(self.tabpfn_folder, True)
self.logger = logging.getLogger(SklearnLeakageDetector.__name__)
self.n_jobs = 10
[docs]
def hyperparameter_optimization(self, X, y):
"""Performs Bayesian hyperparameter optimization to identify the best
model parameters.
This method uses a Bayesian search strategy to explore a predefined hyperparameter search space and selects the optimal
configuration based on the specified validation loss. The method performs cross-validation within the search to ensure
that the selected hyperparameters generalize well.
Parameters
----------
X : array-like of shape (n_samples, n_features)
The input data to be used for training during hyperparameter optimization.
y : array-like of shape (n_samples,)
The target values (class labels) corresponding to X.
Returns
-------
int
The size of the training dataset after reduction (if applicable).
Raises
------
Exception
If an error occurs during the Bayesian search fitting process.
"""
X_train, y_train = self.__get_training_dataset__(X, y)
learner = self.base_detector(**self.learner_params)
bayes_search_params = dict(
estimator=learner,
search_spaces=self.search_space,
n_iter=self.hp_iters,
scoring=self.validation_loss,
n_jobs=self.n_jobs,
cv=self.inner_cv_iterator,
error_score=0,
random_state=self.random_state,
optimizers_file_path=self.tabpfn_folder,
)
bayes_search = BayesSearchCV(**bayes_search_params)
search_keys = list(self.search_space.keys())
search_keys.sort()
self.logger.info(f"Search Keys {search_keys}")
callback = log_callback(search_keys)
X_train, y_train = self.reduce_dataset(X_train, y_train)
try:
bayes_search.fit(X_train, y_train, groups=None, callback=callback, **self.fit_params)
except Exception as error:
log_exception_error(self.logger, error)
self.logger.error(" Cannot fit the Bayes SearchCV ")
train_size = X_train.shape[0]
if learner is not None:
del learner
gc.collect()
if torch.cuda.is_available():
torch.cuda.empty_cache()
self.estimators = []
for i in range(self.n_hypothesis):
learner_params = copy.deepcopy(self.learner_params)
loss, learner_params = update_params_at_k(
bayes_search, search_keys, learner_params, k=i
)
self.estimators.append([loss, learner_params])
return train_size
[docs]
def fit(self, X, y):
"""Fits the model using cross-validation and performs hyperparameter
optimization.
This method first checks if the model has already been fitted. If not, it runs the hyperparameter optimization process
followed by cross-validation on the specified number of hypotheses. The model is trained using a stratified split of the
dataset, and results are evaluated using predefined metrics.
Parameters
----------
X : array-like of shape (n_samples, n_features)
The input data used for training the models.
y : array-like of shape (n_samples,)
The target values (class labels) corresponding to X.
Notes
-----
During fitting, random classifier and majority voting classifier performance is also calculated for comparison.
"""
if self._is_fitted_:
self.logger.info(f"Model already fitted for the padding {self.padding_code}")
else:
train_size = self.hyperparameter_optimization(X, y)
for i in range(self.n_hypothesis):
loss, learner_params = self.estimators[i]
self.logger.info(f"********** Model {i + 1} with loss {loss} **********")
self.logger.info(f"Parameters {print_dictionary(learner_params)}")
for k, (train_index, test_index) in enumerate(self.cv_iterator.split(X, y)):
train_index = train_index[:train_size]
X_train, X_test = X[train_index], X[test_index]
y_train, y_test = y[train_index], y[test_index]
model = self.base_detector(**learner_params)
X_train, y_train = self.reduce_dataset(X_train, y_train)
X_test, y_test = self.reduce_dataset(X_test, y_test)
model.fit(X=X_train, y=y_train)
p_pred, y_pred = get_scores(X_test, model)
self.logger.info(
f"************************* Split {k + 1} **************************"
)
self.evaluate_scores(X_test, X_train, y_test, y_train, y_pred, p_pred, model, i)
if i == 0:
self.__calculate_random_classifier_accuracy__(
X_train, y_train, X_test, y_test
)
self.__calculate_majority_voting_accuracy__(
X_train, y_train, X_test, y_test
)
directory_path = learner_params.get("base_path", None)
if directory_path is not None:
try:
os.rmdir(directory_path)
self.logger.info(f"The directory `{directory_path}` has been removed.")
except OSError as e:
self.logger.error(f"Error: {directory_path} : {e.strerror}")
self.__store_results__()
[docs]
def reduce_dataset(self, X, y):
"""Reduces the dataset size for optimization purposes if the number of
instances is too large.
This method is specifically useful for scenarios where lightweight models like TabPFN are being used, and the dataset
is too large to fit into memory or optimize efficiently. It reduces the dataset size to a maximum threshold.
Parameters
----------
X : array-like of shape (n_samples, n_features)
The input feature matrix.
y : array-like of shape (n_samples,)
The target values (class labels) corresponding to X.
Returns
-------
tuple
Reduced versions of X and y, if applicable.
"""
if X.shape[0] > 4000 and self.base_detector == AutoTabPFNClassifier:
reduced_size = 4000
self.logger.info(f"Initial instances {X.shape[0]} reduced to {reduced_size}")
X, _, y, _ = train_test_split(
X,
y,
train_size=reduced_size,
stratify=y,
random_state=self.random_state,
)
return X, y
[docs]
def evaluate_scores(self, X_test, X_train, y_test, y_train, y_pred, p_pred, model, n_model):
"""Evaluate and store model performance metrics for the detection
process.
This method computes various evaluation metrics, such as log-loss, accuracy, and confusion matrix, for the model`s
predictions. It also supports probability calibration using techniques like isotonic regression and Platt scaling.
The results are stored and logged for further analysis.
Parameters
----------
X_test : array-like of shape (n_samples, n_features)
The feature matrix for the test set.
X_train : array-like of shape (n_samples, n_features)
The feature matrix for the training set.
y_test : array-like of shape (n_samples,)
The true target labels for the test data.
y_train : array-like of shape (n_samples,)
The true target labels for the training data.
y_pred : array-like of shape (n_samples,)
The predicted target labels for the test set.
p_pred : array-like of shape (n_samples, n_classes)
The predicted class probabilities for the test data.
model : object
The trained model being evaluated.
n_model : int
The index of the model in the list of evaluated models.
"""
super().evaluate_scores(
X_test=X_test,
X_train=X_train,
y_test=y_test,
y_train=y_train,
y_pred=y_pred,
p_pred=p_pred,
model=model,
n_model=n_model,
)
[docs]
def detect(self, detection_method="log_loss_mi"):
"""Executes the detection process to identify potential information
leakage using the specified method.
Parameters
----------
detection_method : str
The method to use for detecting information leakage. Options include:
- `paired-t-test`: Uses paired t-test to compare the accuracy of models against the majority voting baseline.
- `paired-t-test-random`: Uses paired t-test to compare the accuracy of models against a random classifier.
- `fishers-exact-mean`: Applies Fisher's Exact Test on the confusion matrix and computes the mean p-value.
- `fishers-exact-median`: Applies Fisher's Exact Test on the confusion matrix and computes the median p-value.
- `mid_point_mi`: Detects leakage using the midpoint mutual information estimation.
- `log_loss_mi`: Detects leakage using log loss mutual information estimation.
- `log_loss_mi_isotonic_regression`: Uses log loss mutual information estimation with isotonic regression calibration.
- `log_loss_mi_platt_scaling`: Uses log loss mutual information estimation with Platt scaling calibration.
- `log_loss_mi_beta_calibration`: Uses log loss mutual information estimation with beta calibration.
- `log_loss_mi_temperature_scaling`: Uses log loss mutual information estimation with temperature scaling.
- `log_loss_mi_histogram_binning`: Uses log loss mutual information estimation with histogram binning.
- `p_c_softmax_mi`: Uses PC-Softmax mutual information estimation for detection.
Returns
-------
detection_decision : bool
Indicates whether any models showed significant leakage.
hypothesis_rejected : int
The number of models flagged for leakage.
Notes
-----
The method implements a Holm-Bonferroni correction to control the family-wise error rate for multiple models.
"""
return super().detect(detection_method=detection_method)