TL;DR
本文简单交代了神经网络的基本套路以及部分实用组件,以简化开发过程。
Set up
通常我们需要为重复实验设置很多seed,所以我们可以将其打包到一个函数里。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 import numpy as npimport pandas as pdimport randomimport torchimport torch.nn as nndef set_seeds (seed=1024 ): """Set seeds for reproducibility.""" np.random.seed(seed) random.seed(seed) touch.manual_seed(seed) torch.cuda.manual_seed(seed) torch.cuda.manual_seed_all(seed) set_seeds(seed=1024 )
Device
当我们有大型数据集和更大的模型要训练时,我们可以通过在 GPU 上并行化张量操作来加速。
1 2 3 cuda = True device = torch.device("cuda" if (torch.cuda.is_available() and cuda) else "cpu" ) torch.set_default_tensor_type({"cuda" : "torch.cuda.FloatTensor" , "cpu" : "torch.FloatTensor" }.get(str (device)))
Load data
这里依然使用前文引入的螺旋数据作为演示。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 import matplotlib.pyplot as pltimport pandas as pdurl = "http://s3.mindex.xyz/datasets/9378f64fc8dd2817e4c92be0a3bae8e7.csv" df = pd.read_csv(url, header=0 ) df = df.sample(frac=1 ).reset_index(drop=True ) df.head() X = df[["X1" , "X2" ]].values y = df["color" ].values print ("X: " , np.shape(X))print ("y: " , np.shape(y))plt.title("Generated non-linear data" ) colors = {"c1" : "red" , "c2" : "yellow" , "c3" : "blue" } plt.scatter(X[:, 0 ], X[:, 1 ], c=[colors[_y] for _y in y], edgecolors="k" , s=25 ) plt.show()
Split data
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 import collectionsfrom sklearn.model_selection import train_test_splitTRAIN_SIZE = 0.7 VAL_SIZE = 0.15 TEST_SIZE = 0.15 def train_val_test_split (X, y, train_size ): X_train, X_, y_train, y_ = train_test_split(X, y, train_size=TRAIN_SIZE, stratify=y) X_test, X_val, y_test, y_val = train_test_split(X_, y_, train_size=0.5 , stratify=y_) return X_train, X_val, X_test, y_train, y_val, y_test X_train, X_val, X_test, y_train, y_val, y_test = train_val_test_split( X=X, y=y, train_size=TRAIN_SIZE) print (f"X_train: {X_train.shape} , y_train: {y_train.shape} " )print (f"X_val: {X_val.shape} , y_val: {y_val.shape} " )print (f"X_test: {X_test.shape} , y_test: {y_test.shape} " )print (f"Sample point: {X_train[0 ]} → {y_train[0 ]} " )
Label encoding
接下来定义一个 LabelEncoder 来将文本标签编码成唯一的索引。
这里不再使用 scikit-learn 的 LabelEncoder,因为我们希望能够以我们想要的方式保存和加载我们的实例。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 import itertoolsclass LabelEncoder (object ): """Label encoder for tag labels.""" def __init__ (self, class_to_index=None ): self.class_to_index = class_to_index or {} self.index_to_class = {v: k for k, v in self.class_to_index.items()} self.classes = list (self.class_to_index.keys()) def __len__ (self ): return len (self.class_to_index) def __str__ (self ): return f"<LabelEncoder(num_classes={len (self)} >" def fit (self, y ): classes = np.unique(y) for i, class_ in enumerate (classes): self.class_to_index[class_] = i self.index_to_class = {v: k for k, v in self.class_to_index.items()} self.classes = list (self.class_to_index.keys()) def encode (self, y ): encoded = np.zeros((len (y)), dtype=int ) for i, item in enumerate (y): encoded[i] = self.class_to_index[item] return encoded def decode (self, y ): classes = [] for i, item in enumerate (y): classes.append(self.index_to_class[item]) return classes def save (self, fp ): with open (fp, "w" ) as fp: contents = {'class_to_index' : self.class_to_index} json.dump(contents, fp, indent=4 , sort_keys=False ) @classmethod def load (cls, fp ): with open (fp, "r" ) as fp: kwargs = json.load(fp=fp) return cls(**kwargs) label_encoder = LabelEncoder() label_encoder.fit(y_train) label_encoder.class_to_index print (f"y_train[0]: {y_train[0 ]} " )y_train = label_encoder.encode(y_train) y_val = label_encoder.encode(y_val) y_test = label_encoder.encode(y_test) print (f"y_train[0]: {y_train[0 ]} " )counts = np.bincount(y_train) class_weights = {i: 1.0 /count for i, count in enumerate (counts)} print (f"counts: {counts} \nweights: {class_weights} " )
Standardize data
我们需要标准化我们的数据(零均值和单位方差),这样特定特征的大小就不会影响模型学习其权重的方式。
我们只对输入X进行标准化,因为我们的输出y是类值。
我们将编写自己的 StandardScaler 类,以便在推理过程中轻松保存和加载它。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 class StandardScaler (object ): def __init__ (self, mean=None , std=None ): self.mean = np.array(mean) self.std = np.array(std) def fit (self, X ): self.mean = np.mean(X_train, axis=0 ) self.std = np.std(X_train, axis=0 ) def scale (self, X ): return (X - self.mean) / self.std def unscale (self, X ): return (X * self.std) + self.mean def save (self, fp ): with open (fp, "w" ) as fp: contents = {"mean" : self.mean.tolist(), "std" : self.std.tolist()} json.dump(contents, fp, indent=4 , sort_keys=False ) @classmethod def load (cls, fp ): with open (fp, "r" ) as fp: kwargs = json.load(fp=fp) return cls(**kwargs) X_scaler = StandardScaler() X_scaler.fit(X_train) print (f"X_test[0]: mean: {np.mean(X_test[:, 0 ], axis=0 ):.1 f} , std: {np.std(X_test[:, 0 ], axis=0 ):.1 f} " )print (f"X_test[1]: mean: {np.mean(X_test[:, 1 ], axis=0 ):.1 f} , std: {np.std(X_test[:, 1 ], axis=0 ):.1 f} " )
DataLoader
我们将把数据放在 Dataset 中,并使用 DataLoader 来有效地创建用于训练和验证的批次。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 class Dataset (torch.utils.data.Dataset): def __init__ (self, X, y ): self.X = X self.y = y def __len__ (self ): return len (self.y) def __str__ (self ): return f"<Dataset(N={len (self)} )>" def __getitem__ (self, index ): X = self.X[index] y = self.y[index] return [X, y] def collate_fn (self, batch ): """Processing on a batch.""" batch = np.array(batch) X = np.stack(batch[:, 0 ], axis=0 ) y = batch[:, 1 ] X = torch.FloatTensor(X.astype(np.float32)) y = torch.LongTensor(y.astype(np.int32)) return X, y def create_dataloader (self, batch_size, shuffle=False , drop_last=False ): return torch.utils.data.DataLoader( dataset=self, batch_size=batch_size, collate_fn=self.collate_fn, shuffle=shuffle, drop_last=drop_last, pin_memory=True )
事实上我们并不需要 collate_fn ,但我们可以让它透明(无副作用),因为当我想要对批处理做一些处理的时候,需要用到这个方法。(如:数据padding)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 train_dataset = Dataset(X=X_train, y=y_train) val_dataset = Dataset(X=X_val, y=y_val) test_dataset = Dataset(X=X_test, y=y_test) print ("Datasets:\n" f" Train dataset:{train_dataset.__str__()} \n" f" Val dataset: {val_dataset.__str__()} \n" f" Test dataset: {test_dataset.__str__()} \n" "Sample point:\n" f" X: {train_dataset[0 ][0 ]} \n" f" y: {train_dataset[0 ][1 ]} " )
之前的文章中都是利用全部的数据进行梯度计算,然而更标准的做法是 mini-batch 随机梯度下降,也就是将样本分成多个只有 n(BATCH_SIZE) 个样本的 mini-batch。这就是 Dataloader 派上用场的地方。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 batch_size = 64 train_dataloader = train_dataset.create_dataloader(batch_size=batch_size) val_dataloader = val_dataset.create_dataloader(batch_size=batch_size) test_dataloader = test_dataset.create_dataloader(batch_size=batch_size) batch_X, batch_y = next (iter (train_dataloader)) print ("Sample batch:\n" f" X: {list (batch_X.size())} \n" f" y: {list (batch_y.size())} \n" "Sample point:\n" f" X: {batch_X[0 ]} \n" f" y: {batch_y[0 ]} " )
Model
我们需要定义一个模型,以便继续给出训练阶段的实用组件。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 INPUT_DIM = X_train.shape[1 ] HIDDEN_DIM = 100 DROPOUT_P = .01 NUM_CLASSES = len (label_encoder.classes) NUM_EPOCHS = 10 class MLP (nn.Module): def __init__ (self, input_dim, hidden_dim, dropout_p, num_classes ): super (MLP, self).__init__() self.fc1 = nn.Linear(input_dim, hidden_dim) self.dropout = nn.Dropout(dropout_p) self.fc2 = nn.Linear(hidden_dim, num_classes) def forward (self, x_in ): z = F.relu(self.fc1(x_in)) z = self.dropout(z) z = self.fc2(z) return z model = MLP(input_dim=INPUT_DIM, hidden_dim=HIDDEN_DIM, dropout_p=DROPOUT_P, num_classes=NUM_CLASSES) model = model.to(device) print (model.named_parameters)
Trainer
之前的文章,我们一直在编写只使用循环来训练分割后的训练数据,然后在测试集上评估。
但实际工作中,我们会遵循下面这个过程:
使用mini-batches进行训练
在验证集上评估损失,并更新超参
训练结束后,在测试集上评估模型
所以我们需要创建 Trainer 类来组织这些过程。
首先,train_step 用来执行小批量数据训练
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 def train_step (self, dataloader ): self.model.train() loss = 0.0 for i, batch in enumerate (dataloader): batch = [item.to(self.device) for item in batch] inputs, targets = batch[:-1 ], batch[-1 ] self.optimizer.zero_grad() z = self.model(inputs) J = self.loss_fn(z, targets) J.backward() self.optimizer.step() loss += (j.detach().item() - loss) / (i + 1 ) return loss
然后 eval_step,用于验证
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 def eval_step (self, dataloader ): self.model.eval () loss = 0.0 y_trues, x_probs = [], [] with torch.inference_model(): for i, batch in enumerate (dataloader): batch = [item.to(self.device) for item in batch] inputs, y_trye = batch[:-1 ], batch[-1 ] z = self.model(inputs) J = self.loss_fn(z, y_true).item() loss += (J - loss) / (i + 1 ) y_prob = F.softmax(z).cpu().numpy() y_probs.extend(y_prob) y_trues.extend(y_true.cpu().numpy()) return loss, np.vstack(y_trues), np.vstack(y_probs)
最后 predict_step, 只是用来对数据进行预测
1 2 3 4 5 6 7 8 9 10 11 12 13 def predict_step (self, dataloader ): self.model.eval () y_prods = [] with torch.inference_model(): for i, batch in enumerate (dataloader): inputs, y_trye = batch[:-1 ], batch[-1 ] z = self.model(inputs) y_prob = F.softmax(z).cpu().numpy() y_probs.extend(y_prob) return np.vstack(y_probs)
LR scheduler
我们将向优化器添加一个学习率调度器,以在训练期间调整我们的学习率。
有许多调度器 可供选择,但最受欢迎的是 ReduceLROnPlateau ,它在指标(例如:验证损失)停止改进的时候,减少学习率。
1 2 3 4 5 6 7 8 9 scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode="min" , factor=0.1 , patience=3 ) for epoch in range (NUM_EPOCHS * 10 ): ... train_loss = trainer.train_step(dataloader=train_dataloader) val_loss, _, _ = trainer.eval_step(dataloader=val_dataloader) scheduler.step(val_loss) ...
Early stopping
我们不应该拍脑袋训练足够多的epoch,而是应该有个明确的停止标准。
常见的停止标准,是模型达到一个期望的性能时,即停止训练。
1 2 3 4 5 6 7 8 9 10 11 12 if val_loss < best_val_loss: best_val_loss = val_loss best_model = trainer.model _patience = patience else : _patience -= 1 if not _patience: print ("Stopping early!" ) break
Training
现在把上面这些放到一起
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 from torch.optim import Adamimport torch.nn.functional as FLEARNING_RATE = 1e-2 NUM_EPOCHS = 100 PATIENCE = 3 class_weights_tensor = torch.Tensor(list (class_weights.values())).to(device) loss_fn = nn.CrossEntropyLoss(weight=class_weights_tensor) optimizer = Adam(model.parameters(), lr=LEARNING_RATE) scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau( optimizer, mode="min" , factor=0.1 , patience=3 ) class Trainer (object ): def __init__ (self, model, device, loss_fn=None , optimizer=None , scheduler=None ): self.model = model self.device = device self.loss_fn = loss_fn self.optimizer = optimizer self.scheduler = scheduler def train_step (self, dataloader ): """Train step.""" self.model.train() loss = 0.0 for i, batch in enumerate (dataloader): batch = [item.to(self.device) for item in batch] inputs, targets = batch[:-1 ], batch[-1 ] self.optimizer.zero_grad() z = self.model(inputs) J = self.loss_fn(z, targets) J.backward() self.optimizer.step() loss += (J.detach().item() - loss) / (i + 1 ) return loss def eval_step (self, dataloader ): """Validation or test step.""" self.model.eval () loss = 0.0 y_trues, y_probs = [], [] with torch.inference_mode(): for i, batch in enumerate (dataloader): batch = [item.to(self.device) for item in batch] inputs, y_true = batch[:-1 ], batch[-1 ] z = self.model(inputs) J = self.loss_fn(z, y_true).item() loss += (J - loss) / (i + 1 ) y_prob = F.softmax(z).cpu().numpy() y_probs.extend(y_prob) y_trues.extend(y_true.cpu().numpy()) return loss, np.vstack(y_trues), np.vstack(y_probs) def predict_step (self, dataloader ): """Prediction step.""" self.model.eval () y_probs = [] with torch.inference_mode(): for i, batch in enumerate (dataloader): inputs, targets = batch[:-1 ], batch[-1 ] z = self.model(inputs) y_prob = F.softmax(z).cpu().numpy() y_probs.extend(y_prob) return np.vstack(y_probs) def train (self, num_epochs, patience, train_dataloader, val_dataloader ): best_val_loss = np.inf for epoch in range (num_epochs): train_loss = self.train_step(dataloader=train_dataloader) val_loss, _, _ = self.eval_step(dataloader=val_dataloader) self.scheduler.step(val_loss) if val_loss < best_val_loss: best_val_loss = val_loss best_model = self.model _patience = patience else : _patience -= 1 if not _patience: print ("Stopping early!" ) break print ( f"Epoch: {epoch+1 } | " f"train_loss: {train_loss:.5 f} , " f"val_loss: {val_loss:.5 f} , " f"lr: {self.optimizer.param_groups[0 ]['lr' ]:.2 E} , " f"_patience: {_patience} " ) return best_model trainer = Trainer( model=model, device=device, loss_fn=loss_fn, optimizer=optimizer, scheduler=scheduler) best_model = trainer.train( NUM_EPOCHS, PATIENCE, train_dataloader, val_dataloader)
Evaluation
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 import jsonfrom sklearn.metrics import precision_recall_fscore_supportdef get_metrics (y_true, y_pred, classes ): """Per-class performance metrics.""" performance = {"overall" : {}, "class" : {}} metrics = precision_recall_fscore_support(y_true, y_pred, average="weighted" ) performance["overall" ]["precision" ] = metrics[0 ] performance["overall" ]["recall" ] = metrics[1 ] performance["overall" ]["f1" ] = metrics[2 ] performance["overall" ]["num_samples" ] = np.float64(len (y_true)) metrics = precision_recall_fscore_support(y_true, y_pred, average=None ) for i in range (len (classes)): performance["class" ][classes[i]] = { "precision" : metrics[0 ][i], "recall" : metrics[1 ][i], "f1" : metrics[2 ][i], "num_samples" : np.float64(metrics[3 ][i]), } return performance test_loss, y_true, y_prob = trainer.eval_step(dataloader=test_dataloader) y_pred = np.argmax(y_prob, axis=1 ) performance = get_metrics( y_true=y_test, y_pred=y_pred, classes=label_encoder.classes) print (json.dumps(performance["overall" ], indent=2 ))
Saving & loading
我们需要保存一些必要的模型数据,以供后续能够完整的加载和使用。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 from pathlib import Pathdir = Path("mlp" )dir .mkdir(parents=True , exist_ok=True )label_encoder.save(fp=Path(dir , "label_encoder.json" )) X_scaler.save(fp=Path(dir , "X_scaler.json" )) torch.save(best_model.state_dict(), Path(dir , "model.pt" )) with open (Path(dir , 'performance.json' ), "w" ) as fp: json.dump(performance, indent=2 , sort_keys=False , fp=fp) device = torch.device("cpu" ) label_encoder = LabelEncoder.load(fp=Path(dir , "label_encoder.json" )) X_scaler = StandardScaler.load(fp=Path(dir , "X_scaler.json" )) model = MLP( input_dim=INPUT_DIM, hidden_dim=HIDDEN_DIM, dropout_p=DROPOUT_P, num_classes=NUM_CLASSES) model.load_state_dict(torch.load(Path(dir , "model.pt" ), map_location=device)) model.to(device) trainer = Trainer(model=model, device=device) sample = [[0.106737 , 0.114197 ]] X = X_scaler.scale(sample) y_filler = label_encoder.encode([label_encoder.classes[0 ]]*len (X)) dataset = Dataset(X=X, y=y_filler) dataloader = dataset.create_dataloader(batch_size=batch_size) y_prob = trainer.predict_step(dataloader) y_pred = np.argmax(y_prob, axis=1 ) label_encoder.decode(y_pred)
Ending
本文给出了一个机器学习项目的基本组件, 事实上,还有一些其他的重要组成没有覆盖到。比如:
文本序列化的Tokenizers
表征数据的Encoders
数据padding
实验跟踪及可视化结果
超惨优化
等等
后续我们会继续学习,至少到这里,我们有了入门深度学习的基础了。