Source code for climb.tool.impl_agpl.tool_data_valuation

import os
import pickle
from typing import Any, Dict

import pandas as pd
from pydvl.utils.dataset import Dataset  # noqa: F401  # type: ignore
from pydvl.utils.utility import Utility  # noqa: F401  # type: ignore
from pydvl.value import compute_shapley_values  # noqa: F401  # type: ignore
from pydvl.value.shapley import ShapleyMode  # noqa: F401  # type: ignore
from sklearn.model_selection import train_test_split
from sklearn.neighbors import KNeighborsClassifier
from sklearn.utils import Bunch

from climb.common.utils import raise_if_extra_not_available

from ..tool_comms import ToolCommunicator, ToolReturnIter, execute_tool
from ..tools import ToolBase

raise_if_extra_not_available()


# TODO: Consider improving this function by running multiple times and taking mean shapely values as
# The number of records under the threshold is quite volatile.
[docs] def knn_shapley_valuation( tc: ToolCommunicator, data_file_path: str, target_variable: str, workspace: str, ) -> None: # General pre-processing df = pd.read_csv(data_file_path) df = clean_dataframe(df) # pydvl pre-processing data, X, y = preprocess_dataframe(df, target_variable) # Initialize the KNN classifier knn = KNeighborsClassifier(n_neighbors=5) # Create the Utility object utility = Utility(knn, data) # pyright: ignore tc.print("Running knn...") shapley_values = compute_shapley_values(utility, mode=ShapleyMode.KNN, progress=True) # NOTE: The thresholds here are heuristics. threshold = 0 df_shapley = pd.DataFrame( { "value": shapley_values.values, "index": shapley_values.indices, } ) # Filter Shapley values greater than 0 positive_df = df_shapley[df_shapley["value"] >= threshold] # Filter Shapley values less than 0 negative_df = df_shapley[df_shapley["value"] < threshold] # Save the results. tc.print("Saving the results...") results = { "shapley_values": shapley_values.values, "good_samples": positive_df["index"].values, "bad_samples": negative_df["index"].values, "df": df, } results_path = os.path.join(workspace, "knn_shapley_valuation_results.p") with open(results_path, "wb") as f: pickle.dump(results, f) # Load in all the data. tc.print("Preparing the plot...") tc.set_returns( tool_return=f""" Results saved to: `{results_path}`. This is a pickle file containing a dictionary with keys: {{ "shapley_values": the `ValuationResult` object containing all the Shapley values, "good_samples": numpy array with indices of good samples, "bad_samples": numpy array with indices of bad samples, "df": the DataFrame used for the analysis }} """, )
[docs] class KNNShapleyValuation(ToolBase): def _execute(self, **kwargs: Any) -> ToolReturnIter: real_data_path = os.path.join(self.working_directory, kwargs["data_file_path"]) target_variable = kwargs["target_variable"] thrd, out_stream = execute_tool( knn_shapley_valuation, wd=self.working_directory, data_file_path=real_data_path, target_variable=target_variable, workspace=self.working_directory, ) self.tool_thread = thrd return out_stream @property def name(self) -> str: return "knn_shapley_data_valuation" @property def description(self) -> str: return ( "This is a data valuation tool. It tool uses the KNN algorithm to compute Shapley values for each feature in the dataset. " "The tool returns a list of features with positive Shapley values, which are considered good predictors, " "and a list of features with negative Shapley values, which are considered bad predictors." "The user may want to exclude the bad predictors from their model to improve its performance." ) @property def specification(self) -> Dict[str, Any]: return { "type": "function", "function": { "name": self.name, "description": self.description, "parameters": { "type": "object", "properties": { "data_file_path": {"type": "string", "description": "Path to the data file."}, "target_variable": {"type": "string", "description": "Name of the target variable."}, }, }, }, } @property def description_for_user(self) -> str: return ( "This is a data valuation tool. It uses the KNN algorithm to compute Shapley values for each feature in the dataset. " "The Shapley value of a feature is a measure of its importance in predicting the target variable. " "The tool returns a list of features with positive Shapley values, which are considered good predictors, " "and a list of features with negative Shapley values, which are considered bad predictors." "You may want to exclude the bad predictors from your model to improve its performance." )
[docs] def clean_dataframe(df, unique_threshold=15): # Identify column data types inferred_categorical_columns = [] inferred_numerical_columns = [] inferred_boolean_columns = [] for col in df.columns: unique_values = df[col].dropna().unique() # Drop NA to get unique values num_unique_values = len(unique_values) if df[col].dtype == "bool": inferred_boolean_columns.append(col) elif num_unique_values < unique_threshold or df[col].dtype == "object": inferred_categorical_columns.append(col) elif pd.api.types.is_numeric_dtype(df[col]): inferred_numerical_columns.append(col) else: # Handle mixed or unexpected data types try: df[col] = pd.to_numeric(df[col], errors="coerce") inferred_numerical_columns.append(col) except ValueError: inferred_categorical_columns.append(col) numerical_columns = [ col for col in inferred_numerical_columns if col not in inferred_categorical_columns and col not in inferred_boolean_columns ] categorical_columns = inferred_categorical_columns boolean_columns = inferred_boolean_columns # Convert categorical columns to category indices for col in categorical_columns: df[col] = pd.Categorical(df[col]).codes # Clean numerical columns for col in numerical_columns: df[col] = pd.to_numeric(df[col], errors="coerce") # Handle missing values - example: fill with the median df[col] = df[col].fillna(df[col].median()) # Convert boolean columns to integers for col in boolean_columns: df[col] = df[col].astype(int) return df
[docs] def preprocess_dataframe(df: pd.DataFrame, target_column: str, test_size=0.2, random_state=42): # Split the DataFrame into features and target X = df.drop(columns=[target_column]) y = df[target_column] # Optionally, you can split the data into training and testing sets X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=test_size, random_state=random_state) # Convert the training data into a scikit-learn-like Bunch object sklearn_dataset = Bunch(data=X_train.values, target=y_train.values, feature_names=X_train.columns.tolist()) # Create a PyDVL Dataset from the scikit-learn dataset pydvl_dataset = Dataset.from_sklearn(sklearn_dataset) return pydvl_dataset, X_test.values, y_test.values