In this article, I’m going to introduce some of my most-used python code snippets about deep learning in PyTorch. During my college journey into machine learning both in class and in my industrial practice, they have been very handy of quickly setting up a model training experiment and getting my job done. Most code snippets are mainly targeting at use cases under Jupyter Notebook/Lab, but theoretically they should work in the CLI mode all the same.
Common Imports Experiment with Ignite library
1 2 3 4 5 6 7 8 9 10 11 12 13 import pandas as pdfrom fastprogress import progress_barfrom argparse import Namespacefrom pathlib import Pathimport numpy as npimport torchfrom torch import nn, optimfrom torch.utils.data import DataLoader, Datasetimport randomfrom sklearn.preprocessing import normalizefrom ignite.engine import Events, create_supervised_trainer, create_supervised_evaluatorfrom ignite.metrics import Accuracy, MeanSquaredError
Experiment with fast.ai library
1 2 3 4 5 6 7 8 9 10 11 12 13 import pandas as pdfrom fastprogress import progress_barfrom argparse import Namespacefrom pathlib import Pathimport torchfrom torch import nnimport randomfrom sklearn.preprocessing import normalizefrom fastai import * from fastai.basic_data import *from fastai.text import *from fastai.tabular import *
Initialization We can use Namespace to scope our project-wise arguments. The set_seeds
function here sets the Numpy and PyTorch random seeds for us. By using the same seed, we make sure that our results can be reproduced on the same hardware every time we rerun the notebook.
Toggling cuda
argument here helps us easily turning GPU acceleration on and off.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 def set_seeds (seed, cuda ): torch.manual_seed(seed) if cuda: torch.cuda.manual_seed_all(seed) args = Namespace( seed=1234 , cuda=True , norm_main_df_path=google_drive_path + 'data/main.csv' , sample_length=120 , batch_size=256 , num_workers=4 ) set_seeds(seed=args.seed, cuda=args.cuda) if not torch.cuda.is_available(): print("CUDA not available" ) args.cuda = False args.device = torch.device("cuda" if args.cuda else "cpu" ) print("Using CUDA: {}" .format (args.cuda))
Data Preprocessing Add new columns to a DataFrame 1 2 3 4 5 origin_df = pd.DataFrame({...}); new_columns = {"col_a" : [...], "col_b" : [...]} df_to_merge = pd.DataFrame(new_columns) df_to_merge.index = origin_df.index dest_df = origin_df.join(df_to_merge)
Functional version
1 2 3 4 def add_column_to_df (df:pd.DataFrame, new_columns:dict )->pd.DataFrame: df_to_merge = pd.DataFrame(new_columns) df_to_merge.index = df.index return df.join(df_to_merge)
Split Dataset We can split a DataFrame into training set and validation set using the following functions.
Randomly split DataFrame:
1 2 3 4 5 6 def rand_split_df (df, valid_pct:float =0.2 ): msk = np.random.rand(len (df)) < valid_pct train_df = df[~msk] valid_df = df[msk] return train_df, valid_df
Sequently split DataFrame:
1 2 3 def seq_split_df (df, valid_pct:float =0.2 ): valid_size = int (len (df) * 0.2 ) return df[:-valid_size], df[-valid_size:]
Randomly split list:
1 2 3 4 5 6 7 msk = np.random.rand(len (input_matrices)) < 0.2 train_input_matrices = [x for i, x in enumerate (input_matrices) if not msk[i]] valid_input_matrices = [x for i, x in enumerate (input_matrices) if msk[i]] train_truths = [y for i, y in enumerate (ground_truths) if not msk[i]] valid_truths = [y for i, y in enumerate (ground_truths) if msk[i]]
Randomly split ItemList while using fast.ai:
1 databunch = main_itemlist.split_by_rand_pct(0.2 ).databunch(bs=args.batch_size, collate_fn=data_batch_collate)
Sequently split ItemList while using fast.ai:
1 2 3 main_itemlist_size = len (position_predictions) train_itemlist_size = int (main_itemlist_size * 0.8 ) databunch = main_itemlist.split_by_idx(list (range (train_itemlist_size, main_itemlist_size))).label_from_df().databunch(bs=args.batch_size, collate_fn=data_batch_collate)
Custom Dataset torch.utils.data A simple custom Dataset(torch.utils.data.DataSet
)
1 2 3 4 5 6 7 8 9 10 class PriceCurveDataset (Dataset ): def __init__ (self, matrices, prices ): self.matrices = matrices self.prices = prices def __len__ (self ): return len (self.prices) def __getitem__ (self, index ): return self.matrices[index], self.prices[index]
A more complex and complete version of a custom time-series Dataset with oversampling, matrix forming, normalization and etc.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 def split_df (df, valid_pct: float = 0.2 ): valid_size = int (len (df) * 0.2 ) return df[:-valid_size], df[-valid_size:] def random_select (a:float , b:float , prob_a:float ): return random.choices([a, b], weights=[prob_a, 1 -prob_a])[0 ] def random_oversmpl_times (n_times: float ) -> int: floor_val = math.floor(n_times) ceil_val = math.ceil(n_times) floor_prob = ceil_val - n_times return int (random_select(floor_val, ceil_val, floor_prob)) def over_sample_df_with_dict (df, label_col: str , oversmpl_multi_dict: dict ): index_map_list = [] for key in oversmpl_multi_dict: oversmpl_ratio = oversmpl_multi_dict[key] indexes = df.index[df[label_col] == key].tolist() for i in indexes: random_neighbor = random_oversmpl_times(oversmpl_ratio) index_map_list += [i for n in range (random_neighbor)] return index_map_list class TimeSerialClasDataset (Dataset ): def __init__ (self, df, label_col, drop_cols=[], time_len=120 , oversmpl_multi_dict:dict =None , normalize_per_item:bool =False ): self.label_df = df[label_col] self.df = df.drop(drop_cols + [label_col], axis=1 ) self.df_len = len (df) self.row_width = len (self.df.iloc[0 ]) self.padding_row = [0 for i in range (self.row_width)] self.time_len = time_len self.valid_len = self.df_len self.oversmpl_idx_map = None if oversmpl_multi_dict is not None : self.oversmpl_idx_map = over_sample_df_with_dict(df, label_col, oversmpl_multi_dict) self.valid_len = len (self.oversmpl_idx_map) self.normalize_per_item = normalize_per_item def __len__ (self ): return self.valid_len def __getitem__ (self, i ): i = self.get_real_index(i) begin_i = i - self.time_len + 1 begin_i = begin_i if begin_i >= 0 else 0 end_i = i + 1 x = self.df.iloc[begin_i: end_i].values.tolist() if i < self.time_len - 1 : pad_len = self.time_len - i - 1 x += [self.padding_row for pad_i in range (pad_len)] y = self.label_df.iloc[i] if self.normalize_per_item: x = normalize(x, axis=0 ).tolist() return x, y def get_real_index (self, i ): if self.oversmpl_idx_map is None : return i return self.oversmpl_idx_map[i]
Getting DataLoader or DataBunch(if you are using fast.ai):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 def get_seq_clas_databunch (df: DataFrame, label_col:str , label_index_map: dict , path:PathOrStr = '.' , drop_cols = [], time_len:int = 120 , valid_pct: float = 0.2 , train_bs: int = 64 , valid_bs: int = 64 , num_workers:int = 2 , oversmpl_multi_dict:dict =None , normalize_per_item:bool =False ): def data_batch_collate (batch ): x_list = [] y_list = [] for item in batch: x_list.append(item[0 ]) y_list.append(label_index_map[item[1 ]]) batch_x = torch.FloatTensor(x_list) batch_y = torch.LongTensor(y_list) return batch_x, batch_y train_df, valid_df = split_df(df, valid_pct) train_dataset = TimeSerialClasDataset(train_df, label_col, drop_cols, time_len, oversmpl_multi_dict, normalize_per_item) valid_dataset = TimeSerialClasDataset(valid_df, label_col, drop_cols, time_len, normalize_per_item=normalize_per_item) train_dataloader = DataLoader(train_dataset, batch_size=train_bs, shuffle=True , num_workers=num_workers) valid_dataloader = DataLoader(valid_dataset, batch_size=valid_bs, shuffle=True , num_workers=num_workers) data = DataBunch(train_dataloader, valid_dataloader, collate_fn=data_batch_collate, path=path) return data
Design Models Count parameters of a model 1 2 3 4 def count_parameters (model ): return sum (p.numel() for p in model.parameters() if p.requires_grad) print(f'The model has {count_parameters(model):,} trainable parameters' )
Sample Module Class 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 class ACRNN (nn.Module ): def __init__ (self, input_dim = 4 , hidden_dim = 60 , n_layers = 1 , linears: list = [100 , 20 ], bidirectional=False ): super (ACRNN, self).__init__() self.input_dim=input_dim self.hidden_dim=hidden_dim self.n_layers=n_layers self.bidirectional=bidirectional self.rnn = nn.LSTM(input_dim, hidden_dim, num_layers=n_layers, batch_first=True , bidirectional=bidirectional) last_in_features = hidden_dim * self.get_rnn_layers(False ) * 3 linear_layers = [] for linear_num in linears: linear_layers.append(nn.Linear(last_in_features, linear_num)) linear_layers.append(nn.ReLU()) last_in_features = linear_num linear_layers.append(nn.Linear(last_in_features, 1 )) self.linears = nn.Sequential(*linear_layers) def get_rnn_layers (self, with_n_layers = True ): return (self.n_layers if with_n_layers else 1 ) * (2 if self.bidirectional else 1 ) def init_hidden (self, x ): hidden_state_layers = self.get_rnn_layers() self.h_t = torch.zeros(hidden_state_layers, x.size(0 ), self.hidden_dim).to(args.device) self.c_t = torch.zeros(hidden_state_layers, x.size(0 ), self.hidden_dim).to(args.device) def forward (self, x ): self.init_hidden(x) x, (self.h_t, self.c_t) = self.rnn(x, (self.h_t, self.c_t)) x_max_pooled = nn.functional.max_pool2d(x, (x.size(1 ), 1 )).flatten(1 ) x_avg_pooled = nn.functional.avg_pool2d(x, (x.size(1 ), 1 )).flatten(1 ) x = x[:, -1 , :] x = torch.cat((x, x_max_pooled, x_avg_pooled), 1 ) x = self.linears(x) return x.flatten(0 )
Train The Model Vanilla PyTorch style:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 optimizer = optim.SGD(model.parameters(), lr=0.01 , momentum=0.8 ) lr_scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=7 , gamma=0.1 ) criterion = nn.MSELoss() criterion.to(args.device) from copy import deepcopyimport timedataloaders = {'train' : train_dataloader, 'valid' : valid_dataloader} dataset_sizes = {'train' : len (train_dataset), 'valid' : len (valid_dataset)} def train_model (model, criterion, optimizer, scheduler, num_epochs = 5 ): since = time.time() best_model_wts = deepcopy(model.state_dict()) best_loss = float ("inf" ) for epoch in range (num_epochs): print('Epoch {}/{}' .format (epoch, num_epochs - 1 )) print('-' * 10 ) for phase in ['train' , 'valid' ]: if phase == 'train' : model.train() else : model.eval () running_loss = 0.0 for data in dataloaders[phase]: inputs = data[0 ].to(args.device) truths = data[1 ].to(args.device) optimizer.zero_grad() with torch.set_grad_enabled(phase == 'train' ): preds = model(inputs) loss = torch.sqrt(criterion(preds.flatten(), truths)) if phase == 'train' : loss.backward() optimizer.step() running_loss += loss.item() * inputs.size(0 ) epoch_loss = running_loss / dataset_sizes[phase] print('{} Loss: {:.4f}' .format (phase, epoch_loss)) if phase == 'valid' : scheduler.step() if phase == 'valid' and epoch_loss < best_loss: best_loss = epoch_loss best_model_wts = deepcopy(model.state_dict()) time_elapsed = time.time() - since print('Training complete in {:.0f}m {:.0f}s' .format ( time_elapsed // 60 , time_elapsed % 60 )) print('Best val Loss: {:4f}' .format (best_loss)) model.load_state_dict(best_model_wts) return model model = train_model(model, criterion, optimizer, lr_scheduler, num_epochs=10 )
Using Ignite:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 trainer = create_supervised_trainer(model, optimizer, criterion, device=args.device) evaluator = create_supervised_evaluator(model, metrics={ 'mse' : MeanSquaredError() }, device=args.device) @trainer.on(Events.EPOCH_COMPLETED ) def log_training_results (trainer ): evaluator.run(train_dataloader) metrics = evaluator.state.metrics print("Training Results - Epoch: {} Avg loss: {:.2f}" .format (trainer.state.epoch, metrics['mse' ])) @trainer.on(Events.EPOCH_COMPLETED ) def log_validation_results (trainer ): evaluator.run(valid_dataloader) metrics = evaluator.state.metrics print("Validation Results - Epoch: {} Avg loss: {:.2f}" .format (trainer.state.epoch, metrics['mse' ])) trainer.run(train_dataloader, 4 )
The approach utilizing fast.ai will be discussed in later posts.