Source code for climb.tool.impl.smart_testing_helpers.utils

# Imports
from copy import deepcopy

import numpy as np
import pandas as pd
from scipy.stats import chi2_contingency, ttest_ind
from sklearn.metrics import accuracy_score
from statsmodels.stats.contingency_tables import mcnemar


[docs] def chi_square_test_for_accuracy(df, model, query): """ Performs a chi-square test for accuracy within a specified subgroup. Parameters: df (pd.DataFrame): The dataset containing features and target. model: The trained predictive model. query (str): The pandas query string to define the subgroup. Returns: float: The p-value from the chi-square test. Returns np.nan in case of errors. """ try: # Subgroup defined by the query subgroup = df.query(query) if subgroup.empty: print(f"No data found for the query: {query}") return np.nan # Drop 'y' and generate predictions subgroup_X = subgroup.drop("y", axis=1) subgroup_predictions = model.predict(subgroup_X) # Convert predictions to a pandas Series with the same index as subgroup['y'] subgroup_predictions = pd.Series(subgroup_predictions, index=subgroup.index) # Counting correct and incorrect predictions in the subgroup correct_subgroup = np.sum(subgroup["y"] == subgroup_predictions) incorrect_subgroup = np.sum(subgroup["y"] != subgroup_predictions) # Complementary group (not in the subgroup) complementary_group = df.query(f"not ({query})") if complementary_group.empty: print(f"No data found for the complementary group of the query: {query}") return np.nan complementary_X = complementary_group.drop("y", axis=1) complementary_predictions = model.predict(complementary_X) # Convert complementary_predictions to a pandas Series with the same index as complementary_group['y'] complementary_predictions = pd.Series(complementary_predictions, index=complementary_group.index) # Counting correct and incorrect predictions in the complementary group correct_complementary = np.sum(complementary_group["y"] == complementary_predictions) incorrect_complementary = np.sum(complementary_group["y"] != complementary_predictions) # Constructing the contingency table table = [[correct_subgroup, correct_complementary], [incorrect_subgroup, incorrect_complementary]] # Perform Chi-Square test chi2, p, dof, expected = chi2_contingency(table) return p except Exception as e: print(f"Error in chi_square_test_for_accuracy: {e}") return np.nan
[docs] def bootstrapping_test_for_accuracy_string(df, model, subgroup, num_bootstrap_samples=200): """ Performs a bootstrapping test for accuracy within a specified subgroup using string queries. Parameters: df (pd.DataFrame): The dataset containing features and target. model: The trained predictive model. subgroup (pd.DataFrame): The subgroup DataFrame. num_bootstrap_samples (int): Number of bootstrap samples. Returns: float: The p-value from the bootstrapping test. """ try: # Define the complementary group based on the subgroup's index remainder = df.loc[~df.index.isin(subgroup.index)].copy() if remainder.empty: print("No complementary group found.") return np.nan # Generate predictions subgroup_X = subgroup.drop("y", axis=1) subgroup_predictions = model.predict(subgroup_X) subgroup_predictions_series = pd.Series(subgroup_predictions, index=subgroup.index) remainder_X = remainder.drop("y", axis=1) remainder_predictions = model.predict(remainder_X) remainder_predictions_series = pd.Series(remainder_predictions, index=remainder.index) # Combine accuracies from both subgroup and remainder pooled_accuracies = np.concatenate( [ (subgroup["y"] == subgroup_predictions_series).astype(int), (remainder["y"] == remainder_predictions_series).astype(int), ] ) # Observed accuracy difference observed_diff = np.mean((subgroup["y"] == subgroup_predictions_series).astype(int)) - np.mean( (remainder["y"] == remainder_predictions_series).astype(int) ) bootstrap_diffs = [] # Bootstrapping under the null hypothesis for _ in range(num_bootstrap_samples): # Resampling with replacement from the pooled accuracies resampled_indices = np.random.choice(len(pooled_accuracies), size=len(pooled_accuracies), replace=True) resampled_accuracies = pooled_accuracies[resampled_indices] # Splitting the resampled accuracies into "subgroup" and "remainder" resampled_subgroup_acc = resampled_accuracies[: len(subgroup)] resampled_remainder_acc = resampled_accuracies[len(subgroup) : len(subgroup) + len(remainder)] # Difference in accuracies for the resampled data resampled_diff = np.mean(resampled_subgroup_acc) - np.mean(resampled_remainder_acc) bootstrap_diffs.append(resampled_diff) # Calculating p-value p_value = np.sum(np.abs(bootstrap_diffs) >= np.abs(observed_diff)) / num_bootstrap_samples return p_value except Exception as e: print(f"Error in bootstrapping_test_for_accuracy_string: {e}") return np.nan
[docs] def bootstrapping_test_for_accuracy(df, model, query, num_bootstrap_samples=200): """ Performs a bootstrapping test for accuracy within a specified subgroup. Parameters: df (pd.DataFrame): The dataset containing features and target. model: The trained predictive model. query (str): The pandas query string to define the subgroup. num_bootstrap_samples (int): Number of bootstrap samples. Returns: float: The p-value from the bootstrapping test. """ try: # Preprocess the query to ensure it's single-line clean_query = query.replace("\n", " ").replace("\r", " ").strip() # Subgroup defined by the query subgroup = df.query(clean_query) remainder = df.query(f"not ({clean_query})") if subgroup.empty or remainder.empty: print(f"Empty subgroup or complementary group for query: {clean_query}") return np.nan # Generate predictions subgroup_X = subgroup.drop("y", axis=1) subgroup_predictions = model.predict(subgroup_X) subgroup_predictions_series = pd.Series(subgroup_predictions, index=subgroup.index) remainder_X = remainder.drop("y", axis=1) remainder_predictions = model.predict(remainder_X) remainder_predictions_series = pd.Series(remainder_predictions, index=remainder.index) # Combine accuracies from both subgroup and remainder pooled_accuracies = np.concatenate( [ (subgroup["y"] == subgroup_predictions_series).astype(int), (remainder["y"] == remainder_predictions_series).astype(int), ] ) # Observed accuracy difference observed_diff = np.mean((subgroup["y"] == subgroup_predictions_series).astype(int)) - np.mean( (remainder["y"] == remainder_predictions_series).astype(int) ) bootstrap_diffs = [] # Bootstrapping under the null hypothesis for _ in range(num_bootstrap_samples): # Resampling with replacement from the pooled accuracies resampled_indices = np.random.choice(len(pooled_accuracies), size=len(pooled_accuracies), replace=True) resampled_accuracies = pooled_accuracies[resampled_indices] # Splitting the resampled accuracies into "subgroup" and "remainder" resampled_subgroup_acc = resampled_accuracies[: len(subgroup)] resampled_remainder_acc = resampled_accuracies[len(subgroup) : len(subgroup) + len(remainder)] # Difference in accuracies for the resampled data resampled_diff = np.mean(resampled_subgroup_acc) - np.mean(resampled_remainder_acc) bootstrap_diffs.append(resampled_diff) # Calculating p-value p_value = np.sum(np.abs(bootstrap_diffs) >= np.abs(observed_diff)) / num_bootstrap_samples return p_value except Exception as e: print(f"Error in bootstrapping_test_for_accuracy: {e}") return np.nan
[docs] def welchs_t_test_for_accuracy(df, model, query): """ Performs Welch's t-test on the accuracies of a subgroup and its complement. Parameters: df (pd.DataFrame): The full dataset containing features and the target variable 'y'. model: The trained model with a predict method. query (str): The pandas query string defining the subgroup. Returns: float: The p-value from Welch's t-test. """ try: # Extract subgroup subgroup = df.query(query) if "y" not in subgroup.columns: raise KeyError("'y' column is missing from the dataframe.") # Generate predictions for the subgroup subgroup_features = subgroup.drop("y", axis=1) subgroup_predictions = model.predict(subgroup_features) # Ensure predictions are NumPy arrays if isinstance(subgroup_predictions, pd.Series): subgroup_predictions = subgroup_predictions.values elif not isinstance(subgroup_predictions, np.ndarray): subgroup_predictions = np.array(subgroup_predictions) # Verify lengths match if len(subgroup_predictions) != len(subgroup): raise ValueError("Number of predictions does not match number of samples in the subgroup.") # Calculate accuracies for the subgroup y_true_subgroup = subgroup["y"].values subgroup_accuracies = (y_true_subgroup == subgroup_predictions).astype(int) # Extract complementary group complementary_group = df.query(f"not ({query})") if "y" not in complementary_group.columns: raise KeyError("'y' column is missing from the complementary dataframe.") # Generate predictions for the complementary group complementary_features = complementary_group.drop("y", axis=1) complementary_predictions = model.predict(complementary_features) # Ensure predictions are NumPy arrays if isinstance(complementary_predictions, pd.Series): complementary_predictions = complementary_predictions.values elif not isinstance(complementary_predictions, np.ndarray): complementary_predictions = np.array(complementary_predictions) # Verify lengths match if len(complementary_predictions) != len(complementary_group): raise ValueError("Number of predictions does not match number of samples in the complementary group.") # Calculate accuracies for the complementary group y_true_complementary = complementary_group["y"].values complementary_accuracies = (y_true_complementary == complementary_predictions).astype(int) # Check for empty groups if len(subgroup_accuracies) == 0 or len(complementary_accuracies) == 0: print("One of the groups has no samples. Returning p-value as NaN.") return np.nan # Perform Welch's t-test t_stat, p_value = ttest_ind(subgroup_accuracies, complementary_accuracies, equal_var=False) return p_value except Exception as e: print(f"Error in welchs_t_test_for_accuracy: {e}") return np.nan
[docs] def mcnemars_test(df, model, query): # Step 1: Calculate overall model accuracy overall_accuracy = accuracy_score(df["y"], model.predict(df.drop("y", axis=1))) # Subgroup subgroup = df.query(query) subgroup_size = len(subgroup) subgroup_predictions = model.predict(subgroup.drop("y", axis=1)) subgroup_actual = subgroup["y"].to_numpy() # Step 2: Calculate expected proportions in subgroup expected_correct = round(overall_accuracy * subgroup_size) expected_incorrect = subgroup_size - expected_correct # Step 3: Observe actual predictions in subgroup actual_correct = np.sum(subgroup_predictions == subgroup_actual) actual_incorrect = subgroup_size - actual_correct # Step 4: Apply McNemar's Test # Constructing the contingency table table = [[actual_correct, actual_incorrect], [expected_correct, expected_incorrect]] # Perform McNemar's test result = mcnemar(table, exact=False, correction=True) return result.pvalue # pyright: ignore
[docs] def calculate_weighted_relative_outcomes(df, query): df_subgroup = df.query(query) support = df_subgroup.shape[0] / df.shape[0] diff_outcomes = df_subgroup["y"].mean() - df["y"].mean() return support * diff_outcomes
[docs] def calculate_weighted_relative_accuracy(df, query, model): df_subgroup = df.query(query) support = df_subgroup.shape[0] / df.shape[0] diff_accuracy = accuracy_score(df_subgroup["y"], model.predict(df_subgroup.drop("y", axis=1))) - accuracy_score( df["y"], model.predict(df.drop("y", axis=1)) ) return support * diff_accuracy
[docs] def calculate_odds_ratio(df, query): """Odds ratio: (p1 * (1-p1) / (p0 * (1-p0))), where p1 is the probability of the outcome in the subgroup, and p0 is the probability of the outcome in the rest of the dataset.""" df_subgroup = df.query(query) df_rest = df.query(f"not ({query})") p1 = df_subgroup["y"].mean() p0 = df_rest["y"].mean() return p1 * (1 - p1) / (p0 * (1 - p0))
[docs] def calculate_odds_ratio_acc(df, query, model): """Odds ratio: (p1 * (1-p1) / (p0 * (1-p0))), where p1 is the % accuracy in the subgroup, and p0 is the % accuracy in the rest of the dataset.""" df_subgroup = df.query(query) df_rest = df.query(f"not ({query})") p1 = accuracy_score(df_subgroup["y"], model.predict(df_subgroup.drop("y", axis=1))) p0 = accuracy_score(df_rest["y"], model.predict(df_rest.drop("y", axis=1))) return p1 * (1 - p1) / (p0 * (1 - p0))
[docs] def calculate_lift(df, query): """Lift: p1 / p, where p is the probability of the outcome in the entire dataset""" df_subgroup = df.query(query) p1 = df_subgroup["y"].mean() p = df["y"].mean() return p1 / p
[docs] def calculate_lift_outcome(df, query, model): """Lift: p1 / p, where p is the accuracy of the entire dataset, and p1 is the accuracy of the subgroup""" df_subgroup = df.query(query) p1 = accuracy_score(df_subgroup["y"], model.predict(df_subgroup.drop("y", axis=1))) p = accuracy_score(df["y"], model.predict(df.drop("y", axis=1))) return p1 / p
[docs] def calculate_group_statistics(X, y, model, query, X_tr=None, num_iterations=250): # Calculate the dataframe df = deepcopy(X) df["y"] = y # clean query query = query.replace("\n", " ").replace("\r", " ").strip() # Filter the subgroup if len(query) == 0 or len(df.query(query)) == 0 or len(df.query(query)) == len(df): return { "group_size": 0, "support": 0, "p_value_mc": 1, "p_value_t": 1, "p_value_chi": 1, "p_value_bootstrap": 1, "num_criteria": 0, "outcome_diff": 0, "accuracy_diff": 0, "odds_ratio_outcome": np.nan, "odds_ratio_acc": np.nan, # odds ratio of the accuracy "query": query, "lift_outcome": np.nan, "lift_acc": np.nan, # lift of the accuracy "weighted_relative_outcome": np.nan, "weighted_relative_accuracy": np.nan, } subgroup = df.query(query) # Calculate statistics group_size = len(subgroup) relative_size = group_size / len(df) num_criteria = query.count("and") + 1 # Counting 'and' and adding 1 for the first condition # Outcome difference avg_outcome_dataset = y.mean() avg_outcome_subgroup = subgroup["y"].mean() outcome_diff = abs(avg_outcome_dataset - avg_outcome_subgroup) # Model accuracy difference if X_tr is not None: subgroup_tr = X_tr.loc[subgroup.index] subgroup_y = y.loc[subgroup.index] accuracy_dataset = accuracy_score(y, model.predict(X_tr)) accuracy_subgroup = accuracy_score(subgroup_y, model.predict(subgroup_tr)) accuracy_diff = abs(accuracy_dataset - accuracy_subgroup) else: accuracy_dataset = accuracy_score(y, model.predict(X)) accuracy_subgroup = accuracy_score(subgroup["y"], model.predict(subgroup.drop("y", axis=1))) accuracy_diff = abs(accuracy_dataset - accuracy_subgroup) # P-value calculation (randomization-based testing) p_value = mcnemars_test(df, model, query) p_value_t = welchs_t_test_for_accuracy(df, model, query) p_value_chi = chi_square_test_for_accuracy(df, model, query) # Get odds ratio odds_ratio = calculate_odds_ratio(df, query) # Calculate lift for outcome lift = calculate_lift(df, query) # Calculate lift for accuracy lift_acc = calculate_lift_outcome(df, query, model) # Calculate weighted relative accuracy and outcomes wro = calculate_weighted_relative_outcomes(df, query) wre = calculate_weighted_relative_accuracy(df, query, model) # Odds ratio of accuracy odds_ratio_acc = calculate_odds_ratio_acc(df, query, model) # Bootstrap p-value acc pval_bootstrap = bootstrapping_test_for_accuracy(df, model, query) return { "group_size": group_size, # size of the subgroup "support": relative_size, # support of the subgroup "p_value_mc": p_value, # p-value for evaluating whether the accuracy is different in the subgroup from average accuracy "p_value_t": p_value_t, # p-value for evaluating whether the accuracy is different in the subgroup from average accuracy "p_value_chi": p_value_chi, # p-value for evaluating whether the accuracy is different in the subgroup from average accuracy "p_value_bootstrap": pval_bootstrap, # p-value for evaluating whether the accuracy is different in the subgroup from average accuracy "num_criteria": num_criteria, # number of criteria in the subgroup "outcome_diff": outcome_diff, # difference in the outcome between the subgroup and the entire dataset "accuracy_diff": accuracy_diff, # difference in the accuracy between the subgroup and the entire dataset "odds_ratio_outcome": odds_ratio, # odds ratio of the outcome "odds_ratio_acc": odds_ratio_acc, # odds ratio of the accuracy "query": query, "lift_outcome": lift, "lift_acc": lift_acc, # lift of the accuracy "weighted_relative_outcome": wro, # Weighted relative outcomes "weighted_relative_accuracy": wre, # Weighted relative accuracy }
[docs] def calculate_group_statistics_string(X, y, model, query, ohe, num_iterations=250): # Calculate the dataframe df = deepcopy(X) df["y"] = y # Filter the subgroup if len(query) == 0 or len(df.query(query)) == 0 or len(df.query(query)) == len(df): return { "group_size": 0, "support": 0, "p_value_mc": 1, "p_value_t": 1, "p_value_chi": 1, "p_value_bootstrap": 1, "num_criteria": 0, "outcome_diff": 0, "accuracy_diff": 0, "odds_ratio_outcome": np.nan, "odds_ratio_acc": np.nan, # odds ratio of the accuracy "query": query, "lift_outcome": np.nan, "lift_acc": np.nan, # lift of the accuracy "weighted_relative_outcome": np.nan, "weighted_relative_accuracy": np.nan, } subgroup = df.query(query) # Calculate statistics group_size = len(subgroup) relative_size = group_size / len(df) num_criteria = query.count("and") + 1 # Counting 'and' and adding 1 for the first condition # Outcome difference avg_outcome_dataset = y.mean() avg_outcome_subgroup = subgroup["y"].mean() outcome_diff = abs(avg_outcome_dataset - avg_outcome_subgroup) # Transform the dataframe (except for y) to one-hot encoding df_ohe = pd.DataFrame(ohe.transform(df.drop("y", axis=1)), columns=ohe.get_feature_names_out()) df_ohe["y"] = df["y"] subgroup_ohe = pd.DataFrame(ohe.transform(subgroup.drop("y", axis=1)), columns=ohe.get_feature_names_out()) subgroup_ohe["y"] = subgroup["y"] X_ohe = df_ohe.drop("y", axis=1) accuracy_dataset = accuracy_score(y, model.predict(X_ohe)) accuracy_subgroup = accuracy_score(subgroup["y"], model.predict(subgroup_ohe.drop("y", axis=1))) accuracy_diff = abs(accuracy_dataset - accuracy_subgroup) # Bootstrap p-value acc pval_bootstrap = bootstrapping_test_for_accuracy_string(df_ohe, model, subgroup_ohe) return { "group_size": group_size, # size of the subgroup "support": relative_size, # support of the subgroup "p_value_bootstrap": pval_bootstrap, # p-value for evaluating whether the accuracy is different in the subgroup from average accuracy "num_criteria": num_criteria, # number of criteria in the subgroup "outcome_diff": outcome_diff, # difference in the outcome between the subgroup and the entire dataset "accuracy_diff": accuracy_diff, # difference in the accuracy between the subgroup and the entire dataset "query": query, }
[docs] def compute_differences_metrics_two_datasets(metrics_1, metrics_2): """Computes the differences between many metrics between the two datasets X1 and X2 that might come from different populations, or be simple train-test splits. IMPORTANT: Differences are calculated as X2 - X1, so a positive difference means that X2 is higher than X1.""" # Calculate the differences metrics_diff = {} for metric in metrics_1: # CHeck if numeric if isinstance(metrics_1[metric], (int, float)): metrics_diff[metric] = metrics_2[metric] - metrics_1[metric] else: metrics_diff[metric] = None return metrics_diff