February 2, 2020

My PyTorch Cookbook

In this article, I’m going to introduce some of my most-used python code snippets about deep learning in PyTorch. During my college journey into machine learning both in class and in my industrial practice, they have been very handy of quickly setting up a model training experiment and getting my job done. Most code snippets are mainly targeting at use cases under Jupyter Notebook/Lab, but theoretically they should work in the CLI mode all the same.

Common Imports

Experiment with Ignite library

1
2
3
4
5
6
7
8
9
10
11
12
13
import pandas as pd
from fastprogress import progress_bar
from argparse import Namespace
from pathlib import Path
import numpy as np
import torch
from torch import nn, optim
from torch.utils.data import DataLoader, Dataset
import random
from sklearn.preprocessing import normalize

from ignite.engine import Events, create_supervised_trainer, create_supervised_evaluator
from ignite.metrics import Accuracy, MeanSquaredError

Experiment with fast.ai library

1
2
3
4
5
6
7
8
9
10
11
12
13
import pandas as pd
from fastprogress import progress_bar
from argparse import Namespace
from pathlib import Path
import torch
from torch import nn
import random
from sklearn.preprocessing import normalize

from fastai import * # using fastai
from fastai.basic_data import *
from fastai.text import *
from fastai.tabular import *

Initialization

We can use Namespace to scope our project-wise arguments. The set_seeds function here sets the Numpy and PyTorch random seeds for us. By using the same seed, we make sure that our results can be reproduced on the same hardware every time we rerun the notebook.

Toggling cuda argument here helps us easily turning GPU acceleration on and off.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# Set Numpy and PyTorch seeds
def set_seeds(seed, cuda):
torch.manual_seed(seed)
if cuda:
torch.cuda.manual_seed_all(seed)

args = Namespace(
seed=1234,
cuda=True,
norm_main_df_path=google_drive_path + 'data/main.csv',
sample_length=120,
batch_size=256,
num_workers=4
)

# Set seeds
set_seeds(seed=args.seed, cuda=args.cuda)

# Check CUDA
if not torch.cuda.is_available():
print("CUDA not available")
args.cuda = False
args.device = torch.device("cuda" if args.cuda else "cpu")
print("Using CUDA: {}".format(args.cuda))

Data Preprocessing

Add new columns to a DataFrame

1
2
3
4
5
origin_df = pd.DataFrame({...});
new_columns = {"col_a": [...], "col_b": [...]}
df_to_merge = pd.DataFrame(new_columns)
df_to_merge.index = origin_df.index
dest_df = origin_df.join(df_to_merge)

Functional version

1
2
3
4
def add_column_to_df(df:pd.DataFrame, new_columns:dict)->pd.DataFrame:
df_to_merge = pd.DataFrame(new_columns)
df_to_merge.index = df.index
return df.join(df_to_merge)

Split Dataset

We can split a DataFrame into training set and validation set using the following functions.

Randomly split DataFrame:

1
2
3
4
5
6
def rand_split_df(df, valid_pct:float=0.2):
msk = np.random.rand(len(df)) < valid_pct
train_df = df[~msk]
valid_df = df[msk]
return train_df, valid_df

Sequently split DataFrame:

1
2
3
def seq_split_df(df, valid_pct:float=0.2):
valid_size = int(len(df) * 0.2)
return df[:-valid_size], df[-valid_size:]

Randomly split list:

1
2
3
4
5
6
7
msk = np.random.rand(len(input_matrices)) < 0.2

train_input_matrices = [x for i, x in enumerate(input_matrices) if not msk[i]]
valid_input_matrices = [x for i, x in enumerate(input_matrices) if msk[i]]

train_truths = [y for i, y in enumerate(ground_truths) if not msk[i]]
valid_truths = [y for i, y in enumerate(ground_truths) if msk[i]]

Randomly split ItemList while using fast.ai:

1
databunch = main_itemlist.split_by_rand_pct(0.2).databunch(bs=args.batch_size, collate_fn=data_batch_collate)

Sequently split ItemList while using fast.ai:

1
2
3
main_itemlist_size = len(position_predictions)
train_itemlist_size = int(main_itemlist_size * 0.8)
databunch = main_itemlist.split_by_idx(list(range(train_itemlist_size, main_itemlist_size))).label_from_df().databunch(bs=args.batch_size, collate_fn=data_batch_collate)

Custom Dataset

torch.utils.data

A simple custom Dataset(torch.utils.data.DataSet)

1
2
3
4
5
6
7
8
9
10
class PriceCurveDataset(Dataset):
def __init__(self, matrices, prices):
self.matrices = matrices
self.prices = prices

def __len__(self):
return len(self.prices)

def __getitem__(self, index):
return self.matrices[index], self.prices[index]

A more complex and complete version of a custom time-series Dataset with oversampling, matrix forming, normalization and etc.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
def split_df(df, valid_pct: float = 0.2):
valid_size = int(len(df) * 0.2)
return df[:-valid_size], df[-valid_size:]

def random_select(a:float, b:float, prob_a:float):
return random.choices([a, b], weights=[prob_a, 1-prob_a])[0]

def random_oversmpl_times(n_times: float) -> int:
floor_val = math.floor(n_times)
ceil_val = math.ceil(n_times)
floor_prob = ceil_val - n_times
return int(random_select(floor_val, ceil_val, floor_prob))

def over_sample_df_with_dict(df, label_col: str, oversmpl_multi_dict: dict):
index_map_list = []
for key in oversmpl_multi_dict:
oversmpl_ratio = oversmpl_multi_dict[key]
indexes = df.index[df[label_col] == key].tolist()
for i in indexes:
random_neighbor = random_oversmpl_times(oversmpl_ratio)
index_map_list += [i for n in range(random_neighbor)]
return index_map_list

class TimeSerialClasDataset(Dataset):
def __init__(self, df, label_col, drop_cols=[], time_len=120,
oversmpl_multi_dict:dict=None, normalize_per_item:bool=False):
self.label_df = df[label_col]
self.df = df.drop(drop_cols + [label_col], axis=1)
self.df_len = len(df)
self.row_width = len(self.df.iloc[0])
self.padding_row = [0 for i in range(self.row_width)]
self.time_len = time_len

self.valid_len = self.df_len

self.oversmpl_idx_map = None
if oversmpl_multi_dict is not None:
self.oversmpl_idx_map = over_sample_df_with_dict(df, label_col, oversmpl_multi_dict)
self.valid_len = len(self.oversmpl_idx_map)
self.normalize_per_item = normalize_per_item

def __len__(self):
return self.valid_len

def __getitem__(self, i):
i = self.get_real_index(i)
begin_i = i - self.time_len + 1
begin_i = begin_i if begin_i >= 0 else 0
end_i = i + 1
x = self.df.iloc[begin_i: end_i].values.tolist()
if i < self.time_len - 1:
pad_len = self.time_len - i - 1
x += [self.padding_row for pad_i in range(pad_len)]
y = self.label_df.iloc[i]
if self.normalize_per_item:
x = normalize(x, axis=0).tolist()
return x, y

def get_real_index(self, i):
if self.oversmpl_idx_map is None:
return i
return self.oversmpl_idx_map[i]

Getting DataLoader or DataBunch(if you are using fast.ai):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
def get_seq_clas_databunch(df: DataFrame, label_col:str, label_index_map: dict, path:PathOrStr = '.', drop_cols = [], 
time_len:int = 120, valid_pct: float = 0.2, train_bs: int = 64,
valid_bs: int = 64, num_workers:int = 2, oversmpl_multi_dict:dict=None,
normalize_per_item:bool=False):

def data_batch_collate(batch):
x_list = []
y_list = []

for item in batch:
x_list.append(item[0])
y_list.append(label_index_map[item[1]])

batch_x = torch.FloatTensor(x_list)
batch_y = torch.LongTensor(y_list)

return batch_x, batch_y

train_df, valid_df = split_df(df, valid_pct)
train_dataset = TimeSerialClasDataset(train_df, label_col, drop_cols, time_len, oversmpl_multi_dict, normalize_per_item)
valid_dataset = TimeSerialClasDataset(valid_df, label_col, drop_cols, time_len, normalize_per_item=normalize_per_item)
train_dataloader = DataLoader(train_dataset, batch_size=train_bs, shuffle=True, num_workers=num_workers)
valid_dataloader = DataLoader(valid_dataset, batch_size=valid_bs, shuffle=True, num_workers=num_workers)
data = DataBunch(train_dataloader, valid_dataloader, collate_fn=data_batch_collate, path=path)
return data

Design Models

Count parameters of a model

1
2
3
4
def count_parameters(model):
return sum(p.numel() for p in model.parameters() if p.requires_grad)

print(f'The model has {count_parameters(model):,} trainable parameters')

Sample Module Class

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
class ACRNN(nn.Module):
def __init__(self, input_dim = 4, hidden_dim = 60, n_layers = 1, linears: list = [100, 20], bidirectional=False):
super(ACRNN, self).__init__()
self.input_dim=input_dim
self.hidden_dim=hidden_dim
self.n_layers=n_layers
self.bidirectional=bidirectional
self.rnn = nn.LSTM(input_dim, hidden_dim, num_layers=n_layers, batch_first=True, bidirectional=bidirectional)

last_in_features = hidden_dim * self.get_rnn_layers(False) * 3
linear_layers = []

for linear_num in linears:
linear_layers.append(nn.Linear(last_in_features, linear_num))
linear_layers.append(nn.ReLU())
last_in_features = linear_num

linear_layers.append(nn.Linear(last_in_features, 1))
self.linears = nn.Sequential(*linear_layers)

def get_rnn_layers(self, with_n_layers = True):
return (self.n_layers if with_n_layers else 1) * (2 if self.bidirectional else 1)

def init_hidden(self, x):
hidden_state_layers = self.get_rnn_layers()
self.h_t = torch.zeros(hidden_state_layers, x.size(0), self.hidden_dim).to(args.device)
self.c_t = torch.zeros(hidden_state_layers, x.size(0), self.hidden_dim).to(args.device)

def forward(self, x):
self.init_hidden(x)

x, (self.h_t, self.c_t) = self.rnn(x, (self.h_t, self.c_t))

x_max_pooled = nn.functional.max_pool2d(x, (x.size(1), 1)).flatten(1)
x_avg_pooled = nn.functional.avg_pool2d(x, (x.size(1), 1)).flatten(1)
x = x[:, -1, :]
x = torch.cat((x, x_max_pooled, x_avg_pooled), 1)

x = self.linears(x)
return x.flatten(0)

Train The Model

Vanilla PyTorch style:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
optimizer = optim.SGD(model.parameters(), lr=0.01, momentum=0.8)
lr_scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=7, gamma=0.1)

criterion = nn.MSELoss()
criterion.to(args.device)

from copy import deepcopy
import time

dataloaders = {'train': train_dataloader, 'valid': valid_dataloader}
dataset_sizes = {'train': len(train_dataset), 'valid': len(valid_dataset)}

def train_model(model, criterion, optimizer, scheduler, num_epochs = 5):
since = time.time()

best_model_wts = deepcopy(model.state_dict())
best_loss = float("inf")

for epoch in range(num_epochs):
print('Epoch {}/{}'.format(epoch, num_epochs - 1))
print('-' * 10)

for phase in ['train', 'valid']:
if phase == 'train':
model.train()
else:
model.eval()

running_loss = 0.0

for data in dataloaders[phase]:
inputs = data[0].to(args.device)
truths = data[1].to(args.device)

optimizer.zero_grad()

with torch.set_grad_enabled(phase == 'train'):
preds = model(inputs)

# Alter the MSE to RMSE by adding the sqrt computation
loss = torch.sqrt(criterion(preds.flatten(), truths))

if phase == 'train':
loss.backward()
optimizer.step()

running_loss += loss.item() * inputs.size(0)

epoch_loss = running_loss / dataset_sizes[phase]

print('{} Loss: {:.4f}'.format(phase, epoch_loss))

if phase == 'valid':
scheduler.step()

# deep copy the model
if phase == 'valid' and epoch_loss < best_loss:
best_loss = epoch_loss
best_model_wts = deepcopy(model.state_dict())



time_elapsed = time.time() - since
print('Training complete in {:.0f}m {:.0f}s'.format(
time_elapsed // 60, time_elapsed % 60))
print('Best val Loss: {:4f}'.format(best_loss))

# load best model weights
model.load_state_dict(best_model_wts)
return model


model = train_model(model, criterion, optimizer, lr_scheduler,
num_epochs=10)

Using Ignite:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
trainer = create_supervised_trainer(model, optimizer, criterion, device=args.device)
evaluator = create_supervised_evaluator(model,
metrics={
'mse': MeanSquaredError()
},
device=args.device)

# @trainer.on(Events.ITERATION_COMPLETED)
# def log_training_loss(trainer):
# print("Epoch[{}] Loss: {:.2f}".format(trainer.state.epoch, trainer.state.output))

@trainer.on(Events.EPOCH_COMPLETED)
def log_training_results(trainer):
evaluator.run(train_dataloader)
metrics = evaluator.state.metrics
print("Training Results - Epoch: {} Avg loss: {:.2f}"
.format(trainer.state.epoch, metrics['mse']))

@trainer.on(Events.EPOCH_COMPLETED)
def log_validation_results(trainer):
evaluator.run(valid_dataloader)
metrics = evaluator.state.metrics
print("Validation Results - Epoch: {} Avg loss: {:.2f}"
.format(trainer.state.epoch, metrics['mse']))

trainer.run(train_dataloader, 4) # 3 epochs

The approach utilizing fast.ai will be discussed in later posts.

About this Post

This post is written by Dizy, licensed under CC BY-NC 4.0.