Set up
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 import numpy as npimport pandas as pdimport randomimport torchimport torch.nn as nndef set_seeds (seed=1024 ): """Set seeds for reproducibility.""" np.random.seed(seed) random.seed(seed) touch.manual_seed(seed) torch.cuda.manual_seed(seed) torch.cuda.manual_seed_all(seed) set_seeds(seed=1024 )
当我们有大型数据集和更大的模型要训练时,我们可以通过在 GPU 上并行化张量操作来加速。
1 2 3 cuda = True device = torch.device("cuda" if (torch.cuda.is_available() and cuda) else "cpu" ) torch.set_default_tensor_type({"cuda" : "torch.cuda.FloatTensor" , "cpu" : "torch.FloatTensor" }.get(str (device)))
Load data
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 import matplotlib.pyplot as pltimport pandas as pdurl = "" df = pd.read_csv(url, header=0 ) df = df.sample(frac=1 ).reset_index(drop=True ) df.head() X = df[["X1" , "X2" ]].values y = df["color" ].values print ("X: " , np.shape(X))print ("y: " , np.shape(y))plt.title("Generated non-linear data" ) colors = {"c1" : "red" , "c2" : "yellow" , "c3" : "blue" } plt.scatter(X[:, 0 ], X[:, 1 ], c=[colors[_y] for _y in y], edgecolors="k" , s=25 )
Split data
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 import collectionsfrom sklearn.model_selection import train_test_splitTRAIN_SIZE = 0.7 VAL_SIZE = 0.15 TEST_SIZE = 0.15 def train_val_test_split (X, y, train_size ): X_train, X_, y_train, y_ = train_test_split(X, y, train_size=TRAIN_SIZE, stratify=y) X_test, X_val, y_test, y_val = train_test_split(X_, y_, train_size=0.5 , stratify=y_) return X_train, X_val, X_test, y_train, y_val, y_test X_train, X_val, X_test, y_train, y_val, y_test = train_val_test_split( X=X, y=y, train_size=TRAIN_SIZE) print (f"X_train: {X_train.shape} , y_train: {y_train.shape} " )print (f"X_val: {X_val.shape} , y_val: {y_val.shape} " )print (f"X_test: {X_test.shape} , y_test: {y_test.shape} " )print (f"Sample point: {X_train[0 ]} → {y_train[0 ]} " )
Label encoding
接下来定义一个 LabelEncoder 来将文本标签编码成唯一的索引。
这里不再使用 scikit-learn 的 LabelEncoder,因为我们希望能够以我们想要的方式保存和加载我们的实例。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 import itertoolsclass LabelEncoder (object ): """Label encoder for tag labels.""" def __init__ (self, class_to_index=None ): self.class_to_index = class_to_index or {} self.index_to_class = {v: k for k, v in self.class_to_index.items()} self.classes = list (self.class_to_index.keys()) def __len__ (self ): return len (self.class_to_index) def __str__ (self ): return f"<LabelEncoder(num_classes={len (self)} >" def fit (self, y ): classes = np.unique(y) for i, class_ in enumerate (classes): self.class_to_index[class_] = i self.index_to_class = {v: k for k, v in self.class_to_index.items()} self.classes = list (self.class_to_index.keys()) def encode (self, y ): encoded = np.zeros((len (y)), dtype=int ) for i, item in enumerate (y): encoded[i] = self.class_to_index[item] return encoded def decode (self, y ): classes = [] for i, item in enumerate (y): classes.append(self.index_to_class[item]) return classes def save (self, fp ): with open (fp, "w" ) as fp: contents = {'class_to_index' : self.class_to_index} json.dump(contents, fp, indent=4 , sort_keys=False ) @classmethod def load (cls, fp ): with open (fp, "r" ) as fp: kwargs = json.load(fp=fp) return cls(**kwargs) label_encoder = LabelEncoder() label_encoder.class_to_index print (f"y_train[0]: {y_train[0 ]} " )y_train = label_encoder.encode(y_train) y_val = label_encoder.encode(y_val) y_test = label_encoder.encode(y_test) print (f"y_train[0]: {y_train[0 ]} " )counts = np.bincount(y_train) class_weights = {i: 1.0 /count for i, count in enumerate (counts)} print (f"counts: {counts} \nweights: {class_weights} " )
Standardize data
我们将编写自己的 StandardScaler 类,以便在推理过程中轻松保存和加载它。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 class StandardScaler (object ): def __init__ (self, mean=None , std=None ): self.mean = np.array(mean) self.std = np.array(std) def fit (self, X ): self.mean = np.mean(X_train, axis=0 ) self.std = np.std(X_train, axis=0 ) def scale (self, X ): return (X - self.mean) / self.std def unscale (self, X ): return (X * self.std) + self.mean def save (self, fp ): with open (fp, "w" ) as fp: contents = {"mean" : self.mean.tolist(), "std" : self.std.tolist()} json.dump(contents, fp, indent=4 , sort_keys=False ) @classmethod def load (cls, fp ): with open (fp, "r" ) as fp: kwargs = json.load(fp=fp) return cls(**kwargs) X_scaler = StandardScaler() print (f"X_test[0]: mean: {np.mean(X_test[:, 0 ], axis=0 ):.1 f} , std: {np.std(X_test[:, 0 ], axis=0 ):.1 f} " )print (f"X_test[1]: mean: {np.mean(X_test[:, 1 ], axis=0 ):.1 f} , std: {np.std(X_test[:, 1 ], axis=0 ):.1 f} " )
我们将把数据放在 Dataset 中,并使用 DataLoader 来有效地创建用于训练和验证的批次。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 class Dataset ( def __init__ (self, X, y ): self.X = X self.y = y def __len__ (self ): return len (self.y) def __str__ (self ): return f"<Dataset(N={len (self)} )>" def __getitem__ (self, index ): X = self.X[index] y = self.y[index] return [X, y] def collate_fn (self, batch ): """Processing on a batch.""" batch = np.array(batch) X = np.stack(batch[:, 0 ], axis=0 ) y = batch[:, 1 ] X = torch.FloatTensor(X.astype(np.float32)) y = torch.LongTensor(y.astype(np.int32)) return X, y def create_dataloader (self, batch_size, shuffle=False , drop_last=False ): return dataset=self, batch_size=batch_size, collate_fn=self.collate_fn, shuffle=shuffle, drop_last=drop_last, pin_memory=True )
事实上我们并不需要 collate_fn ,但我们可以让它透明(无副作用),因为当我想要对批处理做一些处理的时候,需要用到这个方法。(如:数据padding)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 train_dataset = Dataset(X=X_train, y=y_train) val_dataset = Dataset(X=X_val, y=y_val) test_dataset = Dataset(X=X_test, y=y_test) print ("Datasets:\n" f" Train dataset:{train_dataset.__str__()} \n" f" Val dataset: {val_dataset.__str__()} \n" f" Test dataset: {test_dataset.__str__()} \n" "Sample point:\n" f" X: {train_dataset[0 ][0 ]} \n" f" y: {train_dataset[0 ][1 ]} " )
之前的文章中都是利用全部的数据进行梯度计算,然而更标准的做法是 mini-batch 随机梯度下降,也就是将样本分成多个只有 n(BATCH_SIZE) 个样本的 mini-batch。这就是 Dataloader 派上用场的地方。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 batch_size = 64 train_dataloader = train_dataset.create_dataloader(batch_size=batch_size) val_dataloader = val_dataset.create_dataloader(batch_size=batch_size) test_dataloader = test_dataset.create_dataloader(batch_size=batch_size) batch_X, batch_y = next (iter (train_dataloader)) print ("Sample batch:\n" f" X: {list (batch_X.size())} \n" f" y: {list (batch_y.size())} \n" "Sample point:\n" f" X: {batch_X[0 ]} \n" f" y: {batch_y[0 ]} " )
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 INPUT_DIM = X_train.shape[1 ] HIDDEN_DIM = 100 DROPOUT_P = .01 NUM_CLASSES = len (label_encoder.classes) NUM_EPOCHS = 10 class MLP (nn.Module): def __init__ (self, input_dim, hidden_dim, dropout_p, num_classes ): super (MLP, self).__init__() self.fc1 = nn.Linear(input_dim, hidden_dim) self.dropout = nn.Dropout(dropout_p) self.fc2 = nn.Linear(hidden_dim, num_classes) def forward (self, x_in ): z = F.relu(self.fc1(x_in)) z = self.dropout(z) z = self.fc2(z) return z model = MLP(input_dim=INPUT_DIM, hidden_dim=HIDDEN_DIM, dropout_p=DROPOUT_P, num_classes=NUM_CLASSES) model = print (model.named_parameters)
所以我们需要创建 Trainer 类来组织这些过程。
首先,train_step 用来执行小批量数据训练
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 def train_step (self, dataloader ): self.model.train() loss = 0.0 for i, batch in enumerate (dataloader): batch = [ for item in batch] inputs, targets = batch[:-1 ], batch[-1 ] self.optimizer.zero_grad() z = self.model(inputs) J = self.loss_fn(z, targets) J.backward() self.optimizer.step() loss += (j.detach().item() - loss) / (i + 1 ) return loss
然后 eval_step,用于验证
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 def eval_step (self, dataloader ): self.model.eval () loss = 0.0 y_trues, x_probs = [], [] with torch.inference_model(): for i, batch in enumerate (dataloader): batch = [ for item in batch] inputs, y_trye = batch[:-1 ], batch[-1 ] z = self.model(inputs) J = self.loss_fn(z, y_true).item() loss += (J - loss) / (i + 1 ) y_prob = F.softmax(z).cpu().numpy() y_probs.extend(y_prob) y_trues.extend(y_true.cpu().numpy()) return loss, np.vstack(y_trues), np.vstack(y_probs)
最后 predict_step, 只是用来对数据进行预测
1 2 3 4 5 6 7 8 9 10 11 12 13 def predict_step (self, dataloader ): self.model.eval () y_prods = [] with torch.inference_model(): for i, batch in enumerate (dataloader): inputs, y_trye = batch[:-1 ], batch[-1 ] z = self.model(inputs) y_prob = F.softmax(z).cpu().numpy() y_probs.extend(y_prob) return np.vstack(y_probs)
LR scheduler
有许多调度器 可供选择,但最受欢迎的是 ReduceLROnPlateau ,它在指标(例如:验证损失)停止改进的时候,减少学习率。
1 2 3 4 5 6 7 8 9 scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode="min" , factor=0.1 , patience=3 ) for epoch in range (NUM_EPOCHS * 10 ): ... train_loss = trainer.train_step(dataloader=train_dataloader) val_loss, _, _ = trainer.eval_step(dataloader=val_dataloader) scheduler.step(val_loss) ...
Early stopping
1 2 3 4 5 6 7 8 9 10 11 12 if val_loss < best_val_loss: best_val_loss = val_loss best_model = trainer.model _patience = patience else : _patience -= 1 if not _patience: print ("Stopping early!" ) break
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 from torch.optim import Adamimport torch.nn.functional as FLEARNING_RATE = 1e-2 NUM_EPOCHS = 100 PATIENCE = 3 class_weights_tensor = torch.Tensor(list (class_weights.values())).to(device) loss_fn = nn.CrossEntropyLoss(weight=class_weights_tensor) optimizer = Adam(model.parameters(), lr=LEARNING_RATE) scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau( optimizer, mode="min" , factor=0.1 , patience=3 ) class Trainer (object ): def __init__ (self, model, device, loss_fn=None , optimizer=None , scheduler=None ): self.model = model self.device = device self.loss_fn = loss_fn self.optimizer = optimizer self.scheduler = scheduler def train_step (self, dataloader ): """Train step.""" self.model.train() loss = 0.0 for i, batch in enumerate (dataloader): batch = [ for item in batch] inputs, targets = batch[:-1 ], batch[-1 ] self.optimizer.zero_grad() z = self.model(inputs) J = self.loss_fn(z, targets) J.backward() self.optimizer.step() loss += (J.detach().item() - loss) / (i + 1 ) return loss def eval_step (self, dataloader ): """Validation or test step.""" self.model.eval () loss = 0.0 y_trues, y_probs = [], [] with torch.inference_mode(): for i, batch in enumerate (dataloader): batch = [ for item in batch] inputs, y_true = batch[:-1 ], batch[-1 ] z = self.model(inputs) J = self.loss_fn(z, y_true).item() loss += (J - loss) / (i + 1 ) y_prob = F.softmax(z).cpu().numpy() y_probs.extend(y_prob) y_trues.extend(y_true.cpu().numpy()) return loss, np.vstack(y_trues), np.vstack(y_probs) def predict_step (self, dataloader ): """Prediction step.""" self.model.eval () y_probs = [] with torch.inference_mode(): for i, batch in enumerate (dataloader): inputs, targets = batch[:-1 ], batch[-1 ] z = self.model(inputs) y_prob = F.softmax(z).cpu().numpy() y_probs.extend(y_prob) return np.vstack(y_probs) def train (self, num_epochs, patience, train_dataloader, val_dataloader ): best_val_loss = np.inf for epoch in range (num_epochs): train_loss = self.train_step(dataloader=train_dataloader) val_loss, _, _ = self.eval_step(dataloader=val_dataloader) self.scheduler.step(val_loss) if val_loss < best_val_loss: best_val_loss = val_loss best_model = self.model _patience = patience else : _patience -= 1 if not _patience: print ("Stopping early!" ) break print ( f"Epoch: {epoch+1 } | " f"train_loss: {train_loss:.5 f} , " f"val_loss: {val_loss:.5 f} , " f"lr: {self.optimizer.param_groups[0 ]['lr' ]:.2 E} , " f"_patience: {_patience} " ) return best_model trainer = Trainer( model=model, device=device, loss_fn=loss_fn, optimizer=optimizer, scheduler=scheduler) best_model = trainer.train( NUM_EPOCHS, PATIENCE, train_dataloader, val_dataloader)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 import jsonfrom sklearn.metrics import precision_recall_fscore_supportdef get_metrics (y_true, y_pred, classes ): """Per-class performance metrics.""" performance = {"overall" : {}, "class" : {}} metrics = precision_recall_fscore_support(y_true, y_pred, average="weighted" ) performance["overall" ]["precision" ] = metrics[0 ] performance["overall" ]["recall" ] = metrics[1 ] performance["overall" ]["f1" ] = metrics[2 ] performance["overall" ]["num_samples" ] = np.float64(len (y_true)) metrics = precision_recall_fscore_support(y_true, y_pred, average=None ) for i in range (len (classes)): performance["class" ][classes[i]] = { "precision" : metrics[0 ][i], "recall" : metrics[1 ][i], "f1" : metrics[2 ][i], "num_samples" : np.float64(metrics[3 ][i]), } return performance test_loss, y_true, y_prob = trainer.eval_step(dataloader=test_dataloader) y_pred = np.argmax(y_prob, axis=1 ) performance = get_metrics( y_true=y_test, y_pred=y_pred, classes=label_encoder.classes) print (json.dumps(performance["overall" ], indent=2 ))
Saving & loading
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 from pathlib import Pathdir = Path("mlp" )dir .mkdir(parents=True , exist_ok=True ) , "label_encoder.json" )) , "X_scaler.json" )), Path(dir , "" )) with open (Path(dir , 'performance.json' ), "w" ) as fp: json.dump(performance, indent=2 , sort_keys=False , fp=fp) device = torch.device("cpu" ) label_encoder = LabelEncoder.load(fp=Path(dir , "label_encoder.json" )) X_scaler = StandardScaler.load(fp=Path(dir , "X_scaler.json" )) model = MLP( input_dim=INPUT_DIM, hidden_dim=HIDDEN_DIM, dropout_p=DROPOUT_P, num_classes=NUM_CLASSES) model.load_state_dict(torch.load(Path(dir , "" ), map_location=device)) trainer = Trainer(model=model, device=device) sample = [[0.106737 , 0.114197 ]] X = X_scaler.scale(sample) y_filler = label_encoder.encode([label_encoder.classes[0 ]]*len (X)) dataset = Dataset(X=X, y=y_filler) dataloader = dataset.create_dataloader(batch_size=batch_size) y_prob = trainer.predict_step(dataloader) y_pred = np.argmax(y_prob, axis=1 ) label_encoder.decode(y_pred)
本文给出了一个机器学习项目的基本组件, 事实上,还有一些其他的重要组成没有覆盖到。比如: