Source code for climb.tool.impl.tool_imputation

import os
from typing import Any, Dict, List, Optional

import pandas as pd
from hyperimpute.plugins.imputers import Imputers
from hyperimpute.utils.serialization import save

from ..tool_comms import ToolCommunicator, ToolReturnIter, execute_tool
from ..tools import ToolBase


[docs] def update_dataframe(df_orig: pd.DataFrame, df_new: pd.DataFrame) -> pd.DataFrame: # Get the list of columns in df_new that exist in df_orig. common_columns = [col for col in df_new.columns if col in df_orig.columns] # Update df_orig with values from df_new for the common columns. df_out = df_orig.copy() df_out.update(df_new[common_columns]) return df_out
[docs] class BasicHook: def __init__(self, iters: int) -> None: self.iters = iters self.count = 0 def __call__(self, *args: Any, **kwargs: Any) -> None: self.count += 1 print(f"HyperImpute: Iterations completed {self.count}/{self.iters}")
[docs] def hyperimpute_impute( tc: ToolCommunicator, data_file_path: str, imputed_file_path: str, workspace: str, # pylint: disable=unused-argument subset: Optional[List[str]] = None, ) -> None: df = pd.read_csv(data_file_path) df_use = df[subset] if subset else df if subset: tc.print("Imputing subset of columns:") for col in subset: tc.print(f"* {col}") else: tc.print("Imputing all columns") tc.print("Setting up HyperImpute Imputer...") n_inner_iter = 3 imputer = Imputers().get( "hyperimpute", n_inner_iter=n_inner_iter, classifier_seed=[ "random_forest", "logistic_regression", "catboost", # "xgboost", # NOTE: xgboost fails for e.g. CF dataset - fails with error related to lack of values of certain classes. # TODO: requires proper investigation and fixing. ], inner_loop_hook=BasicHook(n_inner_iter), ) tc.print("Imputing data...") tc.print("Missing values before imputation:") tc.print(df_use.isnull().sum()) n_missing_before = df_use.isnull().sum().sum() out = imputer.fit_transform(df_use.copy()) if subset: out = update_dataframe(df, out) # Save the imputation model: imputed_file_basename = os.path.splitext(os.path.basename(imputed_file_path))[0] model_file_path = os.path.join(workspace, f"hyperimpute__{imputed_file_basename}.pkl") tc.print("Saving imputation model to:", model_file_path) bytes = save(imputer) with open(model_file_path, "wb") as f: f.write(bytes) tc.print("Missing values after imputation:") tc.print(out.isnull().sum()) n_missing_after = out.isnull().sum().sum() tc.print("Saving imputed data...") out.to_csv(imputed_file_path, index=False) tc.set_returns( tool_return=( f"{n_missing_before - n_missing_after} missing values were imputed. " f"The imputed data has been saved to {imputed_file_path}" ), files_in=[os.path.basename(data_file_path)], files_out=[os.path.basename(imputed_file_path)], )
[docs] class HyperImputeImputation(ToolBase): def _execute(self, **kwargs: Any) -> ToolReturnIter: real_path = os.path.join(self.working_directory, kwargs["data_file_path"]) real_imputed_path = os.path.join(self.working_directory, kwargs["imputed_file_path"]) subset = kwargs.get("subset", None) thrd, out_stream = execute_tool( hyperimpute_impute, wd=self.working_directory, data_file_path=real_path, imputed_file_path=real_imputed_path, workspace=self.working_directory, subset=subset, ) self.tool_thread = thrd return out_stream @property def name(self) -> str: return "hyperimpute_imputation" @property def description(self) -> str: return """ Uses the **HyperImpute** library to automatically impute missing values in your data. """ @property def specification(self) -> Dict[str, Any]: return { "type": "function", "function": { "name": self.name, "description": self.description, "parameters": { "type": "object", "properties": { "data_file_path": {"type": "string", "description": "Path to the data file."}, "imputed_file_path": { "type": "string", "description": "Path to the imputed data file.", }, "subset": { "type": "array", "description": "Optional subset of columns to impute. If not provided, all columns will be imputed.", "items": {"type": "string"}, }, }, "required": ["data_file_path", "imputed_file_path"], }, }, } @property def description_for_user(self) -> str: return "uses the **HyperImpute** library to automatically impute missing values in your data."
# === TRAIN-TEST SPLIT VERSION (TODO: Consolidate) ===
[docs] def hyperimpute_impute_train_test( tc: ToolCommunicator, training_data_path: str, workspace: str, # pylint: disable=unused-argument test_data_path: Optional[str], subset: Optional[List[str]] = None, ) -> None: def modify_basename_only(file_path: str, pattern: str) -> str: filename_no_extension = os.path.splitext(os.path.basename(file_path))[0] new_name = pattern.replace("<filename_no_extension>", filename_no_extension) return os.path.join(workspace, new_name) training_data_path = os.path.join(workspace, training_data_path) training_imputed_file_path = modify_basename_only(training_data_path, "<filename_no_extension>_imputed.csv") tc.print("Training data path:", training_data_path) tc.print("Imputed training data path to be used:", training_imputed_file_path) if test_data_path: test_data_path = os.path.join(workspace, test_data_path) test_imputed_file_path = modify_basename_only(test_data_path, "<filename_no_extension>_imputed.csv") tc.print("\nTest data path:", test_data_path) tc.print("Imputed test data path to be used:", test_imputed_file_path) df_train = pd.read_csv(training_data_path) df_train_use = df_train[subset] if subset else df_train if test_data_path: df_test = pd.read_csv(test_data_path) df_test_use = df_test[subset] if subset else df_test model_file_path = modify_basename_only(training_data_path, "hyperimpute__<filename_no_extension>.pkl") tc.print("\nHyperImpute model file path:", model_file_path) if subset: tc.print("\nImputing subset of columns:") for col in subset: tc.print(f"* {col}") else: tc.print("\nImputing all columns") tc.print("\nSetting up HyperImpute Imputer...") n_inner_iter = 3 imputer = Imputers().get( "hyperimpute", n_inner_iter=n_inner_iter, classifier_seed=[ "random_forest", "logistic_regression", "catboost", # "xgboost", # NOTE: xgboost fails for e.g. CF dataset - fails with error related to lack of values of certain classes. # TODO: requires proper investigation and fixing. ], inner_loop_hook=BasicHook(n_inner_iter), ) tc.print("\nMissing values before imputation (training set):") tc.print(df_train_use.isnull().sum()) n_missing_before = df_train_use.isnull().sum().sum() tc.print("\nTraining HyperImpute model...") out = imputer.fit_transform(df_train_use.copy()) if subset: out = update_dataframe(df_train, out) # Save the imputation model: tc.print("Saving imputation model to:", model_file_path) bytes = save(imputer) with open(model_file_path, "wb") as f: f.write(bytes) tc.print("\nMissing values after imputation (training set):") tc.print(out.isnull().sum()) n_missing_after = out.isnull().sum().sum() tc.print(f"\nSaving imputed data (training dataset)...\nSaving imputed data to: {training_imputed_file_path}") out.to_csv(training_imputed_file_path, index=False) if test_data_path: tc.print("\nImputing test data...") tc.print("\nMissing values before imputation (test set):") tc.print(df_test_use.isnull().sum()) n_missing_before_test = df_test_use.isnull().sum().sum() out_test = imputer.transform(df_test_use.copy()) if subset: out_test = update_dataframe(df_test, out_test) tc.print("\nMissing values after imputation (test set):") tc.print(out_test.isnull().sum()) n_missing_after_test = out_test.isnull().sum().sum() tc.print(f"\nSaving imputed data (test dataset)...\nSaving imputed data to: {test_imputed_file_path}") out_test.to_csv(test_imputed_file_path, index=False) tc.set_returns( tool_return=( f"Training dataset:" f"{n_missing_before - n_missing_after} missing values were imputed. " f"The imputed data has been saved to {training_imputed_file_path}" + ( f"\n\nTest dataset:" f"{n_missing_before_test - n_missing_after_test} missing values were imputed. " # pyright: ignore f"The imputed data has been saved to {test_imputed_file_path}" if test_data_path else "" ) ), files_in=[os.path.basename(training_data_path)] + ([os.path.basename(test_data_path)] if test_data_path else []), files_out=[os.path.basename(training_imputed_file_path)] + ([os.path.basename(test_imputed_file_path)] if test_data_path else []), )
[docs] class HyperImputeImputationTrainTest(ToolBase): def _execute(self, **kwargs: Any) -> ToolReturnIter: thrd, out_stream = execute_tool( hyperimpute_impute_train_test, wd=self.working_directory, training_data_path=kwargs["training_data_path"], test_data_path=kwargs.get("test_data_path", None), workspace=self.working_directory, subset=kwargs.get("subset", None), ) self.tool_thread = thrd return out_stream @property def name(self) -> str: return "hyperimpute_imputation_train_test" @property def description(self) -> str: return """ Uses the **HyperImpute** library to automatically impute missing values in your data (fit on and transform the training data, transform the test data). The imputed data is saved with `_imputed` appended to the filename(s). """ @property def specification(self) -> Dict[str, Any]: return { "type": "function", "function": { "name": self.name, "description": self.description, "parameters": { "type": "object", "properties": { "training_data_path": {"type": "string", "description": "Path to the training data file."}, "test_data_path": {"type": "string", "description": "Optional path to the test data file."}, "subset": { "type": "array", "description": "Optional subset of columns to impute. If not provided, all columns will be imputed.", "items": {"type": "string"}, }, }, "required": ["training_data_path"], }, }, } @property def description_for_user(self) -> str: return "uses the **HyperImpute** library to automatically impute missing values in your data."