Source code for autoqild.dataset_readers.open_ml_timming_dr

"""Reader for OpenML datasets focusing on timing features for data leakage
analysis."""

import logging
from abc import ABCMeta

import numpy as np
import openml
import pandas as pd
from sklearn.preprocessing import LabelEncoder
from sklearn.utils import check_random_state

from .utils import *

__all__ = ["OpenMLTimingDatasetReader"]


[docs] class OpenMLTimingDatasetReader(metaclass=ABCMeta): """Reader for OpenML datasets that are specifically designed for timing- based attacks. This class is designed to process datasets that involve side-channel attacks based on timing, such as the Bleichenbacher timing attack. It reads, cleans, and processes the dataset, and provides methods to create datasets with class imbalance to simulate attack scenarios. Parameters ---------- dataset_id : int The ID of the OpenML dataset. imbalance : float The ratio of the number of minority class samples to the number of majority class samples. Must be between 0 and 1. create_datasets : bool, default=True If True, creates leakage datasets during initialization. random_state : int or RandomState instance, optional Random state for reproducibility. **kwargs : dict Additional keyword arguments. Attributes ---------- logger : logging.Logger Logger instance for logging information. dataset_id : int The ID of the OpenML dataset. imbalance : float The ratio of the number of minority class samples to the number of majority class samples. random_state : RandomState instance Random state for reproducibility. correct_class : str The correct class label, representing correctly formatted messages. vulnerable_classes : list of str List of class labels representing vulnerable (incorrectly formatted) messages. n_features : int Number of features in the dataset. fold_id : int The fold ID as specified in the dataset description. delay : int The delay associated with the timing attack in microseconds. dataset_dictionary : dict A dictionary where keys are vulnerable class labels and values are tuples of (X, y) for the respective classes. Private Methods --------------- __read_dataset__() Reads the dataset from OpenML and extracts relevant information. This method fetches the dataset using the OpenML API, extracts the raw data, and processes the dataset description to retrieve vulnerable class labels, number of features, and server information. __create_leakage_datasets__() Creates separate datasets for each class by selecting only the samples that belong to the correct class and one vulnerable class at a time. __clean_up_dataset__() Cleans and preprocesses the dataset. This method encodes categorical columns, formats class labels, fills missing values, and convert class label strings to integer values. """ def __init__( self, dataset_id: int, imbalance: float, create_datasets=True, random_state=None, **kwargs, ): self.logger = logging.getLogger(OpenMLTimingDatasetReader.__name__) self.dataset_id = dataset_id self.imbalance = imbalance self.random_state = check_random_state(random_state) self.correct_class = "Correctly_formatted_PKCS#1_PMS_message" self.vulnerable_classes = [] self.__read_dataset__() self.__clean_up_dataset__() if create_datasets: self.__create_leakage_datasets__()
[docs] def __read_dataset__(self): self.dataset = openml.datasets.get_dataset(self.dataset_id, download_data=True) # Access the dataset information self.data_frame_raw, _, _, self.attribute_names = self.dataset.get_data( dataset_format="dataframe" ) self.attribute_names.remove(LABEL_COL) self.dataset_dictionary = {} if self.correct_class not in self.data_frame_raw[LABEL_COL].unique(): raise ValueError(f"Dataframe is does not contain correct class {self.correct_class}") self.logger.info( f"Class Labels unformulated {list(self.data_frame_raw[LABEL_COL].unique())}" ) description = self.dataset.description vulnerable_classes_str = description.split("\n")[-1].split("vulnerable_classes ")[-1] vulnerable_classes_str = vulnerable_classes_str.strip("[]") self.vulnerable_classes = [s.strip() for s in vulnerable_classes_str.split(",")] self.n_features = len(self.dataset.features) - 1 self.fold_id = int(description.split("\n")[-2].split("fold_id ")[-1]) self.delay = int( description.split("Bleichenbacher Timing Attack: ")[-1].split(" micro seconds")[0] )
[docs] def __clean_up_dataset__(self): categorical_columns = self.data_frame_raw.select_dtypes(include=["object"]).columns label_encoder = LabelEncoder() for column in categorical_columns: if column != LABEL_COL: self.data_frame_raw[column] = label_encoder.fit_transform( self.data_frame_raw[column] ) self.data_frame_raw[LABEL_COL] = self.data_frame_raw[LABEL_COL].apply( lambda x: clean_class_label(x) ) self.correct_class = clean_class_label(self.correct_class) self.vulnerable_classes = [clean_class_label(s) for s in self.vulnerable_classes] labels = list(self.data_frame_raw[LABEL_COL].unique()) labels.sort() self.logger.info(f"Class Labels formatted {labels}") self.logger.info(f"Correct Padding {self.correct_class}") self.logger.info(f"Vulnerable Padding {self.vulnerable_classes}") labels.remove(self.correct_class) label_encoder = LabelEncoder() label_encoder.fit_transform(labels) self.label_mapping = dict( zip( label_encoder.classes_, label_encoder.transform(label_encoder.classes_) + 1, ) ) self.label_mapping = {**{self.correct_class: 0}, **self.label_mapping} self.inverse_label_mapping = dict((v, k) for k, v in self.label_mapping.items()) self.n_labels = len(self.label_mapping) self.data_frame = pd.DataFrame.copy(self.data_frame_raw) self.data_frame[LABEL_COL].replace(self.label_mapping, inplace=True) self.data_frame = self.data_frame.fillna(value=-1)
[docs] def __create_leakage_datasets__(self): self.dataset_dictionary = {} for j, label in self.inverse_label_mapping.items(): if label == self.correct_class: continue else: self.dataset_dictionary[label] = self.get_data(class_label=j)
[docs] def get_data(self, class_label=1): """Retrieves data for a specific class label. Parameters ---------- class_label : int, default=1 The class label for which to retrieve the data. Returns ------- X : array-like of shape (n_samples, n_features) Feature matrix. y : array-like of shape (n_samples,) Target vector. """ df = pd.DataFrame.copy(self.data_frame) p = [0, class_label] df = df[df.label.isin(p)] df[LABEL_COL].replace([class_label], 1, inplace=True) X, y = df[self.attribute_names].values, df[LABEL_COL].values.flatten() X, y = self.get_sampled_imbalanced_data(X, y) return X, y
[docs] def get_sampled_imbalanced_data(self, X, y): """Creates an imbalanced dataset by sampling from the data. Parameters ---------- X : array-like of shape (n_samples, n_features) Feature matrix. y : array-like of shape (n_samples,) Target vector. Returns ------- X : array-like of shape (n_samples, n_features) Feature matrix after applying sampling to create imbalance. y : array-like of shape (n_samples,) Target vector after applying sampling to create imbalance. """ if self.imbalance < 0.5: # total_instances = X.shape[0] n_0 = len(np.where(y == 0)[0]) n_1 = int(n_0 * (self.imbalance / (1 - self.imbalance))) self.logger.info( f"Before processing----ratio {n_1 / n_0} p {self.imbalance}, n_0 {n_0}, n_1 {n_1}----" ) ind0 = np.where(y == 0)[0] ind1 = self.random_state.choice(np.where(y == 1)[0], n_1) if n_1 < 200: ind0 = np.concatenate((ind0, ind0)) ind1 = self.random_state.choice(np.where(y == 1)[0], 2 * n_1) indx = np.concatenate((ind0, ind1)) self.random_state.shuffle(indx) X, y = X[indx], y[indx] n_0 = len(np.where(y == 0)[0]) n_1 = len(np.where(y == 1)[0]) self.logger.info( f"After processing----ratio {n_1 / n_0} p {self.imbalance}, n_0 {n_0}, n_1 {n_1}----" ) return X, y