Source code for climb.tool.impl_agpl.tool_outlier_detection

# Imports
import os
from pathlib import Path
from typing import Any, Dict, Optional

import numpy as np
import pandas as pd
from cleanlab.outlier import OutOfDistribution  # noqa: F401  # type: ignore
from matplotlib import pyplot as plt
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import KFold

from climb.common.utils import raise_if_extra_not_available

from ..tool_comms import ToolCommunicator, ToolReturnIter, execute_tool
from ..tools import ToolBase

raise_if_extra_not_available()


[docs] def clean_dataframe(df, unique_threshold=15): """ Cleans the dataframe by encoding categorical variables, handling missing values, and converting data types. Parameters: - df (pd.DataFrame): The input dataframe to clean. - unique_threshold (int): Threshold to decide if a numerical column should be treated as categorical. Returns: - pd.DataFrame: The cleaned dataframe. """ # Identify column data types inferred_categorical_columns = [] inferred_numerical_columns = [] inferred_boolean_columns = [] for col in df.columns: unique_values = df[col].dropna().unique() # Drop NA to get unique values num_unique_values = len(unique_values) if df[col].dtype == "bool": inferred_boolean_columns.append(col) elif num_unique_values < unique_threshold or df[col].dtype == "object": inferred_categorical_columns.append(col) elif pd.api.types.is_numeric_dtype(df[col]): inferred_numerical_columns.append(col) else: # Handle mixed or unexpected data types df[col] = pd.to_numeric(df[col], errors="coerce") inferred_numerical_columns.append(col) numerical_columns = [ col for col in inferred_numerical_columns if col not in inferred_categorical_columns and col not in inferred_boolean_columns ] categorical_columns = inferred_categorical_columns boolean_columns = inferred_boolean_columns # Convert categorical columns to category indices, handling NaNs for col in categorical_columns: df[col] = pd.Categorical(df[col].fillna("Missing")).codes # Clean numerical columns for col in numerical_columns: df[col] = pd.to_numeric(df[col], errors="coerce") # Handle missing values - example: fill with the median df[col] = df[col].fillna(df[col].median()) # Convert boolean columns to integers for col in boolean_columns: df[col] = df[col].astype(int) return df
[docs] def cleanlab_outlier_detection( tc: ToolCommunicator, data_file_path: str, cleaned_file_path: str, target_variable: str, workspace: str, time_variable: Optional[str] = None, task_type: str = "classification", ) -> None: if task_type not in ["classification", "survival_analysis"]: raise ValueError( f"`task_type` must be 'classification' or 'survival_analysis'. Cleanlab does not support {task_type}." ) # Get the data and target variable tc.print("Loading the data...") workspace = Path(workspace) # pyright: ignore data_file_path = workspace / data_file_path # pyright: ignore cleaned_file_path = workspace / cleaned_file_path # pyright: ignore df = pd.read_csv(data_file_path) # Save the original data for later df_original = df.copy() original_target_variable = target_variable X_original = df.drop(columns=[original_target_variable]).values y_original = df[original_target_variable].values # Convert to classification using time horizon for the sake of the tool if task_type == "survival_analysis": if time_variable is None: raise ValueError("For survival analysis tasks, `time_column` must be provided.") # convert to classification using time horizon time_horizon = df[time_variable].median() df["TEMP_CLASSIFICATION_FROM_SURVIVAL_EVENT_COL"] = np.where( (df[time_variable] < time_horizon) & (df[target_variable] == 1), 1, 0 ) df.drop([time_variable, target_variable], axis=1, inplace=True) # change target column to the new classification column target_variable = "TEMP_CLASSIFICATION_FROM_SURVIVAL_EVENT_COL" # Process the df to clean it for outlier detection df = clean_dataframe(df) # Verify that the cleaned dataframe aligns with the original assert len(df) == len(df_original), "Row count mismatch after cleaning." assert all(df.index == df_original.index), "Row order mismatch after cleaning." # Split the data into features and target X = df.drop(columns=[target_variable]).values # noqa: F841 y = df[target_variable].values # noqa: F841 # NOTE: we need to use cross-validation to get the OOD scores for all data points, as this method relies on # test and train data. # Initialize Cross-Validation # We'll use 5-Fold Cross-Validation to create pseudo-train/test splits. # Each data point will be evaluated exactly once as part of the test set. K = 5 kf = KFold(n_splits=K, shuffle=True, random_state=42) ood_scores = np.zeros(len(X)) # the OOD scores for each data point across folds. count = np.zeros(len(X)) # number of times each data point has been evaluated. for fold, (train_index, test_index) in enumerate(kf.split(X)): tc.print(f"Processing Fold {fold + 1}/{K}") # Split the data into training and testing sets for the current fold X_train, X_test = X[train_index], X[test_index] y_train, y_test = y[train_index], y[test_index] # Initialize and train the model on the training set model = RandomForestClassifier(n_estimators=100, random_state=42) model.fit(X_train, y_train) y_probs = model.predict_proba(X_test) ood = OutOfDistribution() ood_fold_scores = ood.fit_score(pred_probs=y_probs, labels=y_test) # Accumulate OOD scores and update counts for the test indices ood_scores[test_index] += ood_fold_scores count[test_index] += 1 # Ensure that each data point was evaluated at least once to prevent division by zero count[count == 0] = 1 # This should not occur in standard K-Fold CV # Compute the average OOD score for each data point average_ood_scores = ood_scores / count # Define thresholds based on the 95th and 5th percentiles low_threshold = np.percentile(average_ood_scores, 5) tc.print(f"=Threshold (5th percentile): {low_threshold:.4f}") # Identify Outliers Based on Thresholds # Flag data points with OOD scores below the low threshold low_outliers = average_ood_scores < low_threshold # Retrieve indices of outliers low_outlier_indices = np.where(low_outliers)[0] tc.print(f"Number of Outliers (<{low_threshold:.4f}): {len(low_outlier_indices)}") # Visualize the Distribution of OOD Scores plt.figure() plt.hist(average_ood_scores, bins=50, alpha=0.7, color="skyblue", edgecolor="black") plt.axvline( low_threshold, # pyright: ignore color="green", linestyle="dashed", linewidth=2, label=f"5th Percentile ({low_threshold:.4f})", ) plt.title("Distribution of Averaged OOD Scores with Low Threshold") plt.xlabel("OOD Score") plt.ylabel("Frequency") plt.legend() outlier_plot = plt.gcf() outlier_plot.savefig(os.path.join(workspace, "outlier_detection.png"), bbox_inches="tight") plt.close() # Remove Outliers from the Dataset # Create masks for outliers and inliers mask_outliers = low_outliers mask_inliers = ~low_outliers # Extract inlier and outlier data from original dataset cleaned_X = X_original[mask_inliers] cleaned_y = y_original[mask_inliers] outliers_X = X_original[mask_outliers] # noqa: F841 outliers_y = y_original[mask_outliers] # Save the Cleaned Data to CSV feature_columns = df_original.columns[df_original.columns != original_target_variable] df_cleaned = pd.DataFrame(cleaned_X, columns=feature_columns) df_cleaned[original_target_variable] = cleaned_y tc.print("Saving cleaned data...") df_cleaned.to_csv(cleaned_file_path, index=False) tc.set_returns( tool_return=( f"{len(outliers_y)} outliers were removed. The cleaned data has been saved to {cleaned_file_path}" ), user_report=[ "📊 **Outlier Detection**", "Outliers plot:", outlier_plot, ], files_in=[os.path.basename(data_file_path)], files_out=[os.path.basename(cleaned_file_path)], )
[docs] class CleanlabOutlierDetection(ToolBase): def _execute(self, **kwargs: Any) -> ToolReturnIter: data_file_path = os.path.join(self.working_directory, kwargs["data_file_path"]) cleaned_file_path = os.path.join(self.working_directory, kwargs["cleaned_file_path"]) thrd, out_stream = execute_tool( cleanlab_outlier_detection, wd=self.working_directory, data_file_path=data_file_path, cleaned_file_path=cleaned_file_path, target_variable=kwargs["target_variable"], time_variable=kwargs.get("time_variable"), task_type=kwargs["task_type"], workspace=self.working_directory, ) self.tool_thread = thrd return out_stream @property def name(self) -> str: return "outlier_detection" @property def description(self) -> str: return """ Identifies and removes outliers from a dataset, enhancing the quality and reliability of the data for subsequent analytical or modeling tasks. By leveraging Cleanlab's out_of_distribution function, this function systematically detects anomalous data points through a K-Fold Cross-Validation approach. The cleaned dataset is then saved for further use, ensuring that outliers do not skew the results of downstream processes. This function is only compatible with classification and survival analysis problems. """ @property def specification(self) -> Dict[str, Any]: return { "type": "function", "function": { "name": self.name, "description": self.description, "parameters": { "type": "object", "properties": { "data_file_path": {"type": "string", "description": "Path to the data file."}, "cleaned_file_path": { "type": "string", "description": "Path to the cleaned data file that we create by calling this function.", }, "target_variable": {"type": "string", "description": "Name of the target variable."}, "time_variable": { "type": "string", "description": "The time to event column. This is only applicable for survival analysis tasks, where it is mandatory.", }, "task_type": { "type": "string", "enum": ["classification", "survival_analysis"], "description": "Type of problem (classification or survival_analysis).", }, }, "required": ["data_file_path", "cleaned_file_path", "target_variable", "task_type"], }, }, } @property def description_for_user(self) -> str: return "Identifies and removes outliers from a dataset"