TL;DR

本文简单交代了神经网络的基本套路以及部分实用组件,以简化开发过程。

Set up

通常我们需要为重复实验设置很多seed,所以我们可以将其打包到一个函数里。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import numpy as np
import pandas as pd
import random
import torch
import torch.nn as nn

def set_seeds(seed=1024):
"""Set seeds for reproducibility."""
np.random.seed(seed)
random.seed(seed)
touch.manual_seed(seed)
torch.cuda.manual_seed(seed)
torch.cuda.manual_seed_all(seed) # multi-GPU

set_seeds(seed=1024)

Device

当我们有大型数据集和更大的模型要训练时,我们可以通过在 GPU 上并行化张量操作来加速。

1
2
3
cuda = True
device = torch.device("cuda" if(torch.cuda.is_available() and cuda) else "cpu")
torch.set_default_tensor_type({"cuda": "torch.cuda.FloatTensor", "cpu": "torch.FloatTensor"}.get(str(device)))

Load data

这里依然使用前文引入的螺旋数据作为演示。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import matplotlib.pyplot as plt
import pandas as pd

# Load data
url = "http://s3.mindex.xyz/datasets/9378f64fc8dd2817e4c92be0a3bae8e7.csv"
df = pd.read_csv(url, header=0) # load
df = df.sample(frac=1).reset_index(drop=True) # shuffle
df.head()

# Data shapes
X = df[["X1", "X2"]].values
y = df["color"].values
print ("X: ", np.shape(X))
print ("y: ", np.shape(y))

# Output
# X: (1500, 2)
# y: (1500,)

# Visualize data
plt.title("Generated non-linear data")
colors = {"c1": "red", "c2": "yellow", "c3": "blue"}
plt.scatter(X[:, 0], X[:, 1], c=[colors[_y] for _y in y], edgecolors="k", s=25)
plt.show()

Split data

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import collections
from sklearn.model_selection import train_test_split

TRAIN_SIZE = 0.7
VAL_SIZE = 0.15
TEST_SIZE = 0.15

def train_val_test_split(X, y, train_size):
X_train, X_, y_train, y_ = train_test_split(X, y, train_size=TRAIN_SIZE, stratify=y)
X_test, X_val, y_test, y_val = train_test_split(X_, y_, train_size=0.5, stratify=y_)
return X_train, X_val, X_test, y_train, y_val, y_test

# Create data splits
X_train, X_val, X_test, y_train, y_val, y_test = train_val_test_split(
X=X, y=y, train_size=TRAIN_SIZE)
print (f"X_train: {X_train.shape}, y_train: {y_train.shape}")
print (f"X_val: {X_val.shape}, y_val: {y_val.shape}")
print (f"X_test: {X_test.shape}, y_test: {y_test.shape}")
print (f"Sample point: {X_train[0]}{y_train[0]}")

# Output
# X_train: (1050, 2), y_train: (1050,)
# X_val: (225, 2), y_val: (225,)
# X_test: (225, 2), y_test: (225,)
# Sample point: [0.17003003 0.63079261] → c3

Label encoding

接下来定义一个 LabelEncoder 来将文本标签编码成唯一的索引。

这里不再使用 scikit-learn 的 LabelEncoder,因为我们希望能够以我们想要的方式保存和加载我们的实例。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
import itertools


class LabelEncoder(object):
"""Label encoder for tag labels."""
def __init__(self, class_to_index=None):
self.class_to_index = class_to_index or {}
self.index_to_class = {v: k for k, v in self.class_to_index.items()}
self.classes = list(self.class_to_index.keys())

def __len__(self):
return len(self.class_to_index)

def __str__(self):
return f"<LabelEncoder(num_classes={len(self)}>"

def fit(self, y):
classes = np.unique(y)
for i, class_ in enumerate(classes):
self.class_to_index[class_] = i
self.index_to_class = {v: k for k, v in self.class_to_index.items()}
self.classes = list(self.class_to_index.keys())

def encode(self, y):
encoded = np.zeros((len(y)), dtype=int)
for i, item in enumerate(y):
encoded[i] = self.class_to_index[item]
return encoded

def decode(self, y):
classes = []
for i, item in enumerate(y):
classes.append(self.index_to_class[item])
return classes

def save(self, fp):
with open(fp, "w") as fp:
contents = {'class_to_index': self.class_to_index}
json.dump(contents, fp, indent=4, sort_keys=False)

@classmethod
def load(cls, fp):
with open(fp, "r") as fp:
kwargs = json.load(fp=fp)
return cls(**kwargs)

# Encode
label_encoder = LabelEncoder()
label_encoder.fit(y_train)
label_encoder.class_to_index

# Convert labels to tokens
print (f"y_train[0]: {y_train[0]}")
y_train = label_encoder.encode(y_train)
y_val = label_encoder.encode(y_val)
y_test = label_encoder.encode(y_test)
print (f"y_train[0]: {y_train[0]}")

# Class weights
counts = np.bincount(y_train)
class_weights = {i: 1.0/count for i, count in enumerate(counts)}
print (f"counts: {counts}\nweights: {class_weights}")

# Output
# y_train[0]: c3
# y_train[0]: 2
# counts: [350 350 350]
# weights: {0: 0.002857142857142857, 1: 0.002857142857142857, 2: 0.002857142857142857}

Standardize data

我们需要标准化我们的数据(零均值和单位方差),这样特定特征的大小就不会影响模型学习其权重的方式。

我们只对输入X进行标准化,因为我们的输出y是类值。

我们将编写自己的 StandardScaler 类,以便在推理过程中轻松保存和加载它。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
class StandardScaler(object):
def __init__(self, mean=None, std=None):
self.mean = np.array(mean)
self.std = np.array(std)

def fit(self, X):
self.mean = np.mean(X_train, axis=0)
self.std = np.std(X_train, axis=0)

def scale(self, X):
return (X - self.mean) / self.std

def unscale(self, X):
return (X * self.std) + self.mean

def save(self, fp):
with open(fp, "w") as fp:
contents = {"mean": self.mean.tolist(), "std": self.std.tolist()}
json.dump(contents, fp, indent=4, sort_keys=False)

@classmethod
def load(cls, fp):
with open(fp, "r") as fp:
kwargs = json.load(fp=fp)
return cls(**kwargs)


# Standardize the data (mean=0, std=1) using training data
X_scaler = StandardScaler()
X_scaler.fit(X_train)

# Check (means should be ~0 and std should be ~1)
print (f"X_test[0]: mean: {np.mean(X_test[:, 0], axis=0):.1f}, std: {np.std(X_test[:, 0], axis=0):.1f}")
print (f"X_test[1]: mean: {np.mean(X_test[:, 1], axis=0):.1f}, std: {np.std(X_test[:, 1], axis=0):.1f}")

# Output
# X_test[0]: mean: 0.0, std: 1.0
# X_test[1]: mean: -0.0, std: 1.0

DataLoader

我们将把数据放在 Dataset 中,并使用 DataLoader 来有效地创建用于训练和验证的批次。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
class Dataset(torch.utils.data.Dataset):
def __init__(self, X, y):
self.X = X
self.y = y

def __len__(self):
return len(self.y)

def __str__(self):
return f"<Dataset(N={len(self)})>"

def __getitem__(self, index):
X = self.X[index]
y = self.y[index]
return [X, y]

def collate_fn(self, batch):
"""Processing on a batch."""
# Get inputs
batch = np.array(batch)
X = np.stack(batch[:, 0], axis=0)
y = batch[:, 1]

# Cast
X = torch.FloatTensor(X.astype(np.float32))
y = torch.LongTensor(y.astype(np.int32))

return X, y

def create_dataloader(self, batch_size, shuffle=False, drop_last=False):
return torch.utils.data.DataLoader(
dataset=self, batch_size=batch_size, collate_fn=self.collate_fn,
shuffle=shuffle, drop_last=drop_last, pin_memory=True)

事实上我们并不需要 collate_fn ,但我们可以让它透明(无副作用),因为当我想要对批处理做一些处理的时候,需要用到这个方法。(如:数据padding)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# Create datasets
train_dataset = Dataset(X=X_train, y=y_train)
val_dataset = Dataset(X=X_val, y=y_val)
test_dataset = Dataset(X=X_test, y=y_test)
print ("Datasets:\n"
f" Train dataset:{train_dataset.__str__()}\n"
f" Val dataset: {val_dataset.__str__()}\n"
f" Test dataset: {test_dataset.__str__()}\n"
"Sample point:\n"
f" X: {train_dataset[0][0]}\n"
f" y: {train_dataset[0][1]}")

# Output
# Datasets:
# Train dataset: <Dataset(N=1050)>
# Val dataset: <Dataset(N=225)>
# Test dataset: <Dataset(N=225)>
# Sample point:
# X: [-1.47355106 -1.67417243]
# y: 0

之前的文章中都是利用全部的数据进行梯度计算,然而更标准的做法是 mini-batch 随机梯度下降,也就是将样本分成多个只有 n(BATCH_SIZE) 个样本的 mini-batch。这就是 Dataloader 派上用场的地方。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# Create dataloaders
batch_size = 64
train_dataloader = train_dataset.create_dataloader(batch_size=batch_size)
val_dataloader = val_dataset.create_dataloader(batch_size=batch_size)
test_dataloader = test_dataset.create_dataloader(batch_size=batch_size)
batch_X, batch_y = next(iter(train_dataloader))
print ("Sample batch:\n"
f" X: {list(batch_X.size())}\n"
f" y: {list(batch_y.size())}\n"
"Sample point:\n"
f" X: {batch_X[0]}\n"
f" y: {batch_y[0]}")

# Output
# Sample batch:
# X: [64, 2]
# y: [64]
# Sample point:
# X: tensor([ 0.4535, -0.3570], dtype=torch.float64)
# y: 0

Model

我们需要定义一个模型,以便继续给出训练阶段的实用组件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
INPUT_DIM = X_train.shape[1]  # 2D
HIDDEN_DIM = 100
DROPOUT_P = .01
NUM_CLASSES = len(label_encoder.classes)
NUM_EPOCHS = 10


class MLP(nn.Module):
def __init__(self, input_dim, hidden_dim, dropout_p, num_classes):
super(MLP, self).__init__()
self.fc1 = nn.Linear(input_dim, hidden_dim)
self.dropout = nn.Dropout(dropout_p)
self.fc2 = nn.Linear(hidden_dim, num_classes)

def forward(self, x_in):
z = F.relu(self.fc1(x_in))
z = self.dropout(z)
z = self.fc2(z)
return z

# Initialize model
model = MLP(input_dim=INPUT_DIM, hidden_dim=HIDDEN_DIM,
dropout_p=DROPOUT_P, num_classes=NUM_CLASSES)
model = model.to(device)
print (model.named_parameters)

# Output
# <bound method Module.named_parameters of MLP(
# (fc1): Linear(in_features=2, out_features=100, bias=True)
# (droput): Dropout(p=0.01, inplace=False)
# (fc2): Linear(in_features=100, out_features=3, bias=True)
# )>

Trainer

之前的文章,我们一直在编写只使用循环来训练分割后的训练数据,然后在测试集上评估。

但实际工作中,我们会遵循下面这个过程:

  • 使用mini-batches进行训练
  • 在验证集上评估损失,并更新超参
  • 训练结束后,在测试集上评估模型

所以我们需要创建 Trainer 类来组织这些过程。

首先,train_step 用来执行小批量数据训练

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
def train_step(self, dataloader):
self.model.train()
loss = 0.0

for i, batch in enumerate(dataloader):
batch = [item.to(self.device) for item in batch]
inputs, targets = batch[:-1], batch[-1]
self.optimizer.zero_grad() # reset gradients
z = self.model(inputs) # forward pass
J = self.loss_fn(z, targets)
J.backward() # backward pass
self.optimizer.step() # Update weights

# Cumulative Metrics
loss += (j.detach().item() - loss) / (i + 1)

return loss

然后 eval_step,用于验证

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
def eval_step(self, dataloader):
self.model.eval()
loss = 0.0
y_trues, x_probs = [], []

with torch.inference_model():
for i, batch in enumerate(dataloader):
batch = [item.to(self.device) for item in batch]
inputs, y_trye = batch[:-1], batch[-1]
z = self.model(inputs)
J = self.loss_fn(z, y_true).item()

loss += (J - loss) / (i + 1)

# Store outputs
y_prob = F.softmax(z).cpu().numpy()
y_probs.extend(y_prob)
y_trues.extend(y_true.cpu().numpy())

return loss, np.vstack(y_trues), np.vstack(y_probs)

最后 predict_step, 只是用来对数据进行预测

1
2
3
4
5
6
7
8
9
10
11
12
13
def predict_step(self, dataloader):
self.model.eval()
y_prods = []

with torch.inference_model():
for i, batch in enumerate(dataloader):

inputs, y_trye = batch[:-1], batch[-1]
z = self.model(inputs)
y_prob = F.softmax(z).cpu().numpy()
y_probs.extend(y_prob)

return np.vstack(y_probs)

LR scheduler

我们将向优化器添加一个学习率调度器,以在训练期间调整我们的学习率。

有许多调度器可供选择,但最受欢迎的是 ReduceLROnPlateau ,它在指标(例如:验证损失)停止改进的时候,减少学习率。

1
2
3
4
5
6
7
8
9
# Initialize the LR scheduler
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode="min", factor=0.1, patience=3)

for epoch in range(NUM_EPOCHS * 10):
...
train_loss = trainer.train_step(dataloader=train_dataloader)
val_loss, _, _ = trainer.eval_step(dataloader=val_dataloader)
scheduler.step(val_loss)
...

Early stopping

我们不应该拍脑袋训练足够多的epoch,而是应该有个明确的停止标准。

常见的停止标准,是模型达到一个期望的性能时,即停止训练。

1
2
3
4
5
6
7
8
9
10
11
12

# Early stopping
if val_loss < best_val_loss:
best_val_loss = val_loss
best_model = trainer.model
_patience = patience # reset _patience
else:
_patience -= 1
if not _patience: # 0
print("Stopping early!")
break

Training

现在把上面这些放到一起

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
from torch.optim import Adam
import torch.nn.functional as F

LEARNING_RATE = 1e-2
NUM_EPOCHS = 100
PATIENCE = 3

# Define Loss
class_weights_tensor = torch.Tensor(list(class_weights.values())).to(device)
loss_fn = nn.CrossEntropyLoss(weight=class_weights_tensor)

# Define optimizer & scheduler
optimizer = Adam(model.parameters(), lr=LEARNING_RATE)
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
optimizer, mode="min", factor=0.1, patience=3)


class Trainer(object):
def __init__(self, model, device, loss_fn=None, optimizer=None, scheduler=None):

# Set params
self.model = model
self.device = device
self.loss_fn = loss_fn
self.optimizer = optimizer
self.scheduler = scheduler

def train_step(self, dataloader):
"""Train step."""
# Set model to train mode
self.model.train()
loss = 0.0

# Iterate over train batches
for i, batch in enumerate(dataloader):

# Step
batch = [item.to(self.device) for item in batch] # Set device
inputs, targets = batch[:-1], batch[-1]
self.optimizer.zero_grad() # Reset gradients
z = self.model(inputs) # Forward pass
J = self.loss_fn(z, targets) # Define loss
J.backward() # Backward pass
self.optimizer.step() # Update weights

# Cumulative Metrics
loss += (J.detach().item() - loss) / (i + 1)

return loss

def eval_step(self, dataloader):
"""Validation or test step."""
# Set model to eval mode
self.model.eval()
loss = 0.0
y_trues, y_probs = [], []

# Iterate over val batches
with torch.inference_mode():
for i, batch in enumerate(dataloader):

# Step
batch = [item.to(self.device) for item in batch] # Set device
inputs, y_true = batch[:-1], batch[-1]
z = self.model(inputs) # Forward pass
J = self.loss_fn(z, y_true).item()

# Cumulative Metrics
loss += (J - loss) / (i + 1)

# Store outputs
y_prob = F.softmax(z).cpu().numpy()
y_probs.extend(y_prob)
y_trues.extend(y_true.cpu().numpy())

return loss, np.vstack(y_trues), np.vstack(y_probs)

def predict_step(self, dataloader):
"""Prediction step."""
# Set model to eval mode
self.model.eval()
y_probs = []

# Iterate over val batches
with torch.inference_mode():
for i, batch in enumerate(dataloader):

# Forward pass w/ inputs
inputs, targets = batch[:-1], batch[-1]
z = self.model(inputs)

# Store outputs
y_prob = F.softmax(z).cpu().numpy()
y_probs.extend(y_prob)

return np.vstack(y_probs)

def train(self, num_epochs, patience, train_dataloader, val_dataloader):
best_val_loss = np.inf
for epoch in range(num_epochs):
# Steps
train_loss = self.train_step(dataloader=train_dataloader)
val_loss, _, _ = self.eval_step(dataloader=val_dataloader)
self.scheduler.step(val_loss)

# Early stopping
if val_loss < best_val_loss:
best_val_loss = val_loss
best_model = self.model
_patience = patience # reset _patience
else:
_patience -= 1
if not _patience: # 0
print("Stopping early!")
break

# Logging
print(
f"Epoch: {epoch+1} | "
f"train_loss: {train_loss:.5f}, "
f"val_loss: {val_loss:.5f}, "
f"lr: {self.optimizer.param_groups[0]['lr']:.2E}, "
f"_patience: {_patience}"
)
return best_model

# Trainer module
trainer = Trainer(
model=model, device=device, loss_fn=loss_fn,
optimizer=optimizer, scheduler=scheduler)

# Train
best_model = trainer.train(
NUM_EPOCHS, PATIENCE, train_dataloader, val_dataloader)


# Output
# Epoch: 1 | train_loss: 0.87488, val_loss: 0.66353, lr: 1.00E-02, _patience: 3
# Epoch: 2 | train_loss: 0.66368, val_loss: 0.55748, lr: 1.00E-02, _patience: 3
# ...
# Epoch: 67 | train_loss: 0.03002, val_loss: 0.02305, lr: 1.00E-02, _patience: 3
# Epoch: 68 | train_loss: 0.03011, val_loss: 0.02309, lr: 1.00E-02, _patience: 2
# Epoch: 69 | train_loss: 0.02544, val_loss: 0.02227, lr: 1.00E-02, _patience: 3
# Epoch: 70 | train_loss: 0.02680, val_loss: 0.02154, lr: 1.00E-02, _patience: 3
# Epoch: 71 | train_loss: 0.02897, val_loss: 0.02162, lr: 1.00E-02, _patience: 2
# Epoch: 72 | train_loss: 0.02737, val_loss: 0.02190, lr: 1.00E-02, _patience: 1
# Stopping early!

Evaluation

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
import json
from sklearn.metrics import precision_recall_fscore_support

def get_metrics(y_true, y_pred, classes):
"""Per-class performance metrics."""
# Performance
performance = {"overall": {}, "class": {}}

# Overall performance
metrics = precision_recall_fscore_support(y_true, y_pred, average="weighted")
performance["overall"]["precision"] = metrics[0]
performance["overall"]["recall"] = metrics[1]
performance["overall"]["f1"] = metrics[2]
performance["overall"]["num_samples"] = np.float64(len(y_true))

# Per-class performance
metrics = precision_recall_fscore_support(y_true, y_pred, average=None)
for i in range(len(classes)):
performance["class"][classes[i]] = {
"precision": metrics[0][i],
"recall": metrics[1][i],
"f1": metrics[2][i],
"num_samples": np.float64(metrics[3][i]),
}

return performance


# Get predictions
test_loss, y_true, y_prob = trainer.eval_step(dataloader=test_dataloader)
y_pred = np.argmax(y_prob, axis=1)

# Determine performance
performance = get_metrics(
y_true=y_test, y_pred=y_pred, classes=label_encoder.classes)
print (json.dumps(performance["overall"], indent=2))

# Output
# {
# "precision": 0.9956140350877192,
# "recall": 0.9955555555555555,
# "f1": 0.9955553580159118,
# "num_samples": 225.0
# }

Saving & loading

我们需要保存一些必要的模型数据,以供后续能够完整的加载和使用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
from pathlib import Path


# Save artifacts
dir = Path("mlp")
dir.mkdir(parents=True, exist_ok=True)
label_encoder.save(fp=Path(dir, "label_encoder.json"))
X_scaler.save(fp=Path(dir, "X_scaler.json"))
torch.save(best_model.state_dict(), Path(dir, "model.pt"))
with open(Path(dir, 'performance.json'), "w") as fp:
json.dump(performance, indent=2, sort_keys=False, fp=fp)


# Load artifacts
device = torch.device("cpu")
label_encoder = LabelEncoder.load(fp=Path(dir, "label_encoder.json"))
X_scaler = StandardScaler.load(fp=Path(dir, "X_scaler.json"))
model = MLP(
input_dim=INPUT_DIM, hidden_dim=HIDDEN_DIM,
dropout_p=DROPOUT_P, num_classes=NUM_CLASSES)
model.load_state_dict(torch.load(Path(dir, "model.pt"), map_location=device))
model.to(device)


# Initialize trainer
trainer = Trainer(model=model, device=device)

# Dataloader
sample = [[0.106737, 0.114197]] # c1
X = X_scaler.scale(sample)
y_filler = label_encoder.encode([label_encoder.classes[0]]*len(X))
dataset = Dataset(X=X, y=y_filler)
dataloader = dataset.create_dataloader(batch_size=batch_size)

# Inference
y_prob = trainer.predict_step(dataloader)
y_pred = np.argmax(y_prob, axis=1)
label_encoder.decode(y_pred)

Ending

本文给出了一个机器学习项目的基本组件, 事实上,还有一些其他的重要组成没有覆盖到。比如:

  • 文本序列化的Tokenizers
  • 表征数据的Encoders
  • 数据padding
  • 实验跟踪及可视化结果
  • 超惨优化
  • 等等

后续我们会继续学习,至少到这里,我们有了入门深度学习的基础了。