Source code for climb.tool.impl.data_suite.models.nn_conformal

import copy
import sys
from functools import partial

import numpy as np
import torch
import torch.nn as nn
from nonconformist.base import RegressorAdapter
from sklearn.model_selection import train_test_split

if torch.cuda.is_available():
    device = "cuda:0"
else:
    device = "cpu"


[docs] class MSENet_RegressorAdapter(RegressorAdapter): def __init__( self, model, fit_params=None, in_shape=1, hidden_size=1, learn_func=torch.optim.Adam, epochs=1000, batch_size=10, dropout=0.1, lr=0.01, wd=1e-6, test_ratio=0.2, random_state=0, ): super(MSENet_RegressorAdapter, self).__init__(model, fit_params) # Instantiate model self.epochs = epochs self.batch_size = batch_size self.dropout = dropout self.lr = lr self.wd = wd self.test_ratio = test_ratio self.random_state = random_state self.model = mse_model( in_shape=in_shape, hidden_size=hidden_size, dropout=dropout, ) self.loss_func = torch.nn.MSELoss() self.learner = LearnerOptimized( self.model, partial(learn_func, lr=lr, weight_decay=wd), self.loss_func, device=device, test_ratio=self.test_ratio, random_state=self.random_state, )
[docs] def fit(self, x, y): """ > The function takes in a set of inputs and outputs, and uses them to train the model Args: x: The input data y: The target values. """ self.learner.fit(x, y, self.epochs, batch_size=self.batch_size)
[docs] def predict(self, x): """ > The predict function takes in a single data point and returns the prediction Args: x: the input data Returns: The predicted value of the input x """ return self.learner.predict(x)
if torch.cuda.is_available(): device = "cuda:0" else: device = "cpu" ############################################################################### # Helper functions ###############################################################################
[docs] def train_loop( model, loss_func, x_train, y_train, batch_size, optimizer, cnt=0, best_cnt=np.inf, ): """ > The function defines a training loop for the model Args: model: the model we're training loss_func: the loss function we want to use x_train: the training data y_train: the training labels batch_size: The number of samples to use for each gradient update. optimizer: the optimizer to use. cnt: the number of batches we've trained on. Defaults to 0 best_cnt: the number of epochs to train for. Returns: The epoch loss and the count """ model.train() shuffle_idx = np.arange(x_train.shape[0]) np.random.shuffle(shuffle_idx) x_train = x_train[shuffle_idx] y_train = y_train[shuffle_idx] epoch_losses = [] for idx in range(0, x_train.shape[0], batch_size): cnt = cnt + 1 optimizer.zero_grad() batch_x = x_train[idx : min(idx + batch_size, x_train.shape[0]), :] batch_y = y_train[idx : min(idx + batch_size, y_train.shape[0])] preds = model(batch_x) loss = loss_func(preds, batch_y) loss.backward() optimizer.step() epoch_losses.append(loss.cpu().detach().numpy()) if cnt >= best_cnt: break epoch_loss = np.mean(epoch_losses) return epoch_loss, cnt
############################################################################### # Deep conditional mean regression # Minimizing MSE loss ############################################################################### # Define the network
[docs] class mse_model(nn.Module): """Conditional mean estimator, formulated as neural net""" def __init__(self, in_shape=1, hidden_size=64, dropout=0.5): """Initialization Parameters ---------- in_shape : integer, input signal dimension (p) hidden_size : integer, hidden layer dimension dropout : float, dropout rate """ super().__init__() self.in_shape = in_shape self.out_shape = 1 self.hidden_size = hidden_size self.dropout = dropout self.build_model() self.init_weights()
[docs] def build_model(self): """Construct the network""" self.base_model = nn.Sequential( nn.Linear(self.in_shape, self.hidden_size), nn.ReLU(), nn.Dropout(self.dropout), nn.Linear(self.hidden_size, self.hidden_size), nn.ReLU(), nn.Dropout(self.dropout), nn.Linear(self.hidden_size, 1), )
[docs] def init_weights(self): """Initialize the network parameters""" for m in self.base_model: if isinstance(m, nn.Linear): nn.init.orthogonal_(m.weight) nn.init.constant_(m.bias, 0)
[docs] def forward(self, x): """Run forward pass""" return torch.squeeze(self.base_model(x))
# Define the training procedure
[docs] class LearnerOptimized: """Fit a neural network (conditional mean) to training data""" def __init__( self, model, optimizer_class, loss_func, device="cpu", test_ratio=0.2, random_state=0, ): """Initialization Parameters ---------- model : class of neural network model optimizer_class : class of SGD optimizer (e.g. Adam) loss_func : loss to minimize device : string, "cuda:0" or "cpu" test_ratio : float, test size used in cross-validation (CV) random_state : int, seed to be used in CV when splitting to train-test """ self.model = model.to(device) self.optimizer_class = optimizer_class self.optimizer = optimizer_class(self.model.parameters()) self.loss_func = loss_func.to(device) self.device = device self.test_ratio = test_ratio self.random_state = random_state self.loss_history = [] self.test_loss_history = [] self.full_loss_history = []
[docs] def fit(self, x, y, epochs, batch_size, verbose=False): """Fit the model to data Parameters ---------- x : numpy array, containing the training features (nXp) y : numpy array, containing the training labels (n) epochs : integer, maximal number of epochs batch_size : integer, mini-batch size for SGD """ sys.stdout.flush() model = copy.deepcopy(self.model) model = model.to(device) optimizer = self.optimizer_class(model.parameters()) best_epoch = epochs x_train, xx, y_train, yy = train_test_split( x, y, test_size=self.test_ratio, random_state=self.random_state, ) x_train = ( torch.from_numpy(x_train) .float() .to( self.device, ) .requires_grad_(False) ) xx = torch.from_numpy(xx).float().to(self.device).requires_grad_(False) y_train = ( torch.from_numpy(y_train) .float() .to( self.device, ) .requires_grad_(False) ) yy = torch.from_numpy(yy).float().to(self.device).requires_grad_(False) best_cnt = 1e10 best_test_epoch_loss = 1e10 cnt = 0 for e in range(epochs): epoch_loss, cnt = train_loop( model, self.loss_func, x_train, y_train, batch_size, optimizer, cnt, ) self.loss_history.append(epoch_loss) # print(epoch_loss) # test model.eval() preds = model(xx) test_preds = preds.cpu().detach().numpy() test_preds = np.squeeze(test_preds) test_epoch_loss = self.loss_func(preds, yy).cpu().detach().numpy() self.test_loss_history.append(test_epoch_loss) if test_epoch_loss <= best_test_epoch_loss: best_test_epoch_loss = test_epoch_loss best_epoch = e best_cnt = cnt if (e + 1) % 10 == 0 and verbose: print( "CV: Epoch {}: Train {}, Test {}, Best epoch {}, Best loss {}".format( e + 1, epoch_loss, test_epoch_loss, best_epoch, best_test_epoch_loss, ), ) sys.stdout.flush() # use all the data to train the model, for best_cnt steps x = torch.from_numpy(x).float().to(self.device).requires_grad_(False) y = torch.from_numpy(y).float().to(self.device).requires_grad_(False) cnt = 0 for e in range(best_epoch + 1): if cnt > best_cnt: break epoch_loss, cnt = train_loop( self.model, self.loss_func, x, y, batch_size, self.optimizer, cnt, best_cnt, ) self.full_loss_history.append(epoch_loss) if (e + 1) % 100 == 0 and verbose: print( "Full: Epoch {}: {}, cnt {}".format( e + 1, epoch_loss, cnt, ), ) sys.stdout.flush()
[docs] def predict(self, x): """Estimate the label given the features Parameters ---------- x : numpy array of training features (nXp) Returns ------- ret_val : numpy array of predicted labels (n) """ self.model.eval() ret_val = ( self.model( torch.from_numpy(x) .to( self.device, ) .requires_grad_(False), ) .cpu() .detach() .numpy() ) return ret_val