Numpy实现BP神经网络(包含Dropout、BN等训练技巧)
BP神經網絡
簡介
本文主要通過在MNIST數據集上使用全連接神經網絡對比有無歸一化、有無Dropout以及有無Batch Normalization進行訓練,可視化分析常用的深度網絡訓練技巧的原因及效果。
網絡結構
網絡結構類似下圖,輸入層定為784(輸入圖片特征數),隱藏層1有512個神經元(tanh激活),隱藏層2有512個神經元(tanh激活),輸出層有10個神經元(softmax激活,得到10個類別的概率分布)。
訓練Pipeline
輸入數據
將輸入的一個batch的數據的x處理為[b, 784],y通過onehot編碼處理為[b, 10],
前向傳播
前向計算,得到各層的輸入和輸出。
反向傳播
按照BP規則,計算各個參數的梯度。
參數優化
按照Adam算法,對參數進行更新。
源碼
由于模塊較多,這里直接給出最為核心的model源碼,其他模塊可以在文末的Github找到。
""" Author: Zhou Chen Date: 2019/12/4 Desc: 構建模型 """ import numpy as np from initializers import xavier, zero from utils import onehot from activations import tanh, softmax, softmax_gradient, tanh_gradient from losses import cross_entropy from optimizers import SGD, Adamdef dropout(x, p):"""以概率p丟棄神經元連接,為了處理方便采用反向Dropout思路,該方法無需修改測試網絡"""keep_prob = 1 - p# z這里寫的時候犯了一個錯誤,就是不應該批量生成概率矩陣,而是生成的概率矩陣批量重復d_temp = np.random.binomial(1, keep_prob, size=x.shape[1:]) / keep_probd_temp = d_temp.reshape(-1)x_dropout = x * d_tempreturn x_dropout, d_tempclass Model(object):def __init__(self, num_layers, units_list=None, initializer=None, optimizer='adam'):self.weight_num = num_layers - 1# 根據傳入的初始化方法初始化參數,本次實驗只實現xavier和全0初始化self.params = xavier(num_layers, units_list) if initializer == 'xavier' else zero(num_layers, units_list)self.optimizer = Adam(weights=self.params, weight_num=self.weight_num) if optimizer == 'adam' else SGD()self.bn_param = {}def forward(self, x, dropout_prob=None):"""前向傳播,針對一個mini-batch處理"""net_inputs = [] # 各層的輸入net_outputs = [] # 各層激活后的輸出net_d = []# 為了層號對應,將輸入層直接添加net_inputs.append(x)net_outputs.append(x)net_d.append(np.ones(x.shape[1:])) # 輸入層無丟棄概率for i in range(1, self.weight_num): # 參數數量比層數少1x = x @ self.params['w'+str(i)].Tnet_inputs.append(x)x = tanh(x)if dropout_prob:# 訓練階段丟棄x, d_temp = dropout(x, dropout_prob)net_d.append(d_temp)net_outputs.append(x)out = x @ self.params['w'+str(self.weight_num)].Tnet_inputs.append(out)out = softmax(out)net_outputs.append(out)return {'net_inputs': net_inputs, 'net_outputs': net_outputs, 'd': net_d}, outdef backward(self, nets, y, pred, dropout_prob=None):"""dz[out] = out - ydw[out] = dz[out] @ outputs[out-1].Tdb[out] = dz[out]dz[i] = W[i+1]dz[i+1] * grad(z[i])dw[i] = dz[i] @ outputs[i-1]db[i] = dz[i]sa"""grads = dict()grads['dz'+str(self.weight_num)] = (pred - y) # [b, 10]grads['dw'+str(self.weight_num)] = grads['dz'+str(self.weight_num)].T @ nets['net_outputs'][self.weight_num-1] #[10, 512]for i in reversed(range(1, self.weight_num)):temp = grads['dz' + str(i + 1)] @ self.params['w' + str(i + 1)] * tanh_gradient(nets['net_inputs'][i])if dropout_prob:temp = temp * nets['d'][i] / (1-dropout_prob)grads['dz'+str(i)] = temp # [b, 128]grads['dw'+str(i)] = grads['dz'+str(i)].T @ nets['net_outputs'][i-1]return gradsdef train(self, data_loader, valid_loader, epochs, learning_rate, dropout_prob=None):losses_train = []losses_valid = []for epoch in range(epochs):print("epoch", epoch)# 訓練部分epoch_loss_train = 0for step, (x, y) in enumerate(data_loader):# x:[b, 28, 28] -> [b, 784] , y:[b, 1] -> [b, 10]x = x.reshape(-1, 28 * 28)y = onehot(y, 10)nets, pred = self.forward(x, dropout_prob)loss = cross_entropy(y, pred)epoch_loss_train += lossgrads = self.backward(nets, y, pred, dropout_prob)# SGD更新參數# self.params = optimizer.optimize(self.weight_num, self.params, grads, y.shape[0])self.params = self.optimizer.optimize(self.weight_num, self.params, grads, y.shape[0])if step % 100 == 0:print("epoch {} training step {} loss {:.4f}".format(epoch, step, loss))losses_train.append(epoch_loss_train)print(epoch_loss_train)data_loader.restart()# 驗證部分,只進行前向傳播epoch_loss_valid = 0for step, (x, y) in enumerate(valid_loader):x = x.reshape(-1, 28 * 28)y = onehot(y, 10)nets, pred = self.forward(x, dropout_prob)loss = cross_entropy(y, pred)epoch_loss_valid += lossif step % 100 == 0:print("epoch {} validation step {} loss {:.4f}".format(epoch, step, loss))losses_valid.append(epoch_loss_valid)valid_loader.restart()his = {'train_loss': losses_train, 'valid_loss': losses_valid}return hisdef batch_norm(self, x, layer_index, mode):epsilon = 1e-6momentum = 0.9N, D = x.shapeglobal_mean = self.bn_param.get('global_mean' + str(layer_index), np.zeros(D, dtype=x.dtype))global_var = self.bn_param.get('global_var' + str(layer_index), np.zeros(D, dtype=x.dtype))cache = Noneif mode == 'train':# 計算當前batch的均值和方差sample_mean = np.mean(x, axis=0)sample_var = np.var(x, axis=0)x_hat = (x - sample_mean) / np.sqrt(sample_var + epsilon)out = self.params['gamma' + str(layer_index)] * x_hat + self.params['beta' + str(layer_index)] # bn結束global_mean = momentum * global_mean + (1 - momentum) * sample_meanglobal_var = momentum * global_var + (1 - momentum) * sample_varcache = {'x': x, 'x_hat': x_hat, 'sample_mean': sample_mean, 'sample_var': sample_var}else:# 測試模式,使用全局均值和方差標準化x_hat = (x - global_mean) / np.sqrt(global_var + epsilon)out = self.params['gamma' + str(layer_index)] * x_hat + self.params['beta' + str(layer_index)]self.bn_param['global_mean' + str(layer_index)] = global_meanself.bn_param['global_var' + str(layer_index)] = global_varreturn out, cachedef forward_bn(self, x, bn_mode='train'):"""帶BN層的前向傳播"""net_inputs = []net_outputs = []caches = []net_inputs.append(x)net_outputs.append(x)caches.append(x)for i in range(1, self.weight_num):# 所有隱層的輸入都進行BN,輸入層和輸出層不進行BNx = x = x @ self.params['w'+str(i)].Tnet_inputs.append(x)x, cache = self.batch_norm(x, i, bn_mode) # 可以將BN理解為加在隱藏層神經元輸入和輸出間可訓練的一層caches.append(cache)x = tanh(x)net_outputs.append(x)out = x @ self.params['w' + str(self.weight_num)].Tnet_inputs.append(out)out = softmax(out)net_outputs.append(out)return {'net_inputs': net_inputs, 'net_outputs': net_outputs, 'cache': caches}, outdef backward_bn(self, nets, y, pred):"""加入BN層的反向傳播"""epsilon = 1e-6momentum = 0.9grads = dict()# 求解輸出層梯度,依據鏈式法則,無BNgrads['dz' + str(self.weight_num)] = (pred - y)grads['dw' + str(self.weight_num)] = grads['dz' + str(self.weight_num)].T @ nets['net_outputs'][self.weight_num - 1]for i in reversed(range(1, self.weight_num)):N = nets['cache'][i]['x'].shape[0]grads['dz'+str(i)] = grads['dz' + str(i + 1)] @ self.params['w' + str(i + 1)]grads['dgamma'+str(i)] = np.sum(grads['dz'+str(i)] * nets['cache'][i]['x_hat'])grads['dbeta'+str(i)] = np.sum(grads['dz'+str(i)], axis=0)dx_hat = grads['dz'+str(i)] * self.params['gamma'+str(i)]dsigma = -0.5 * np.sum(dx_hat * (nets['cache'][i]['x'] - nets['cache'][i]['sample_mean']), axis=0) * np.power(nets['cache'][i]['sample_var'][i] + epsilon, -1.5)dmu = -np.sum(dx_hat / np.sqrt(nets['cache'][i]['sample_var'] + epsilon), axis=0) - 2 * dsigma * np.sum(nets['cache'][i]['x'] - nets['cache'][i]['sample_mean'], axis=0) / Ndx = dx_hat / np.sqrt(nets['cache'][i]['sample_var'] + epsilon) + 2.0 * dsigma * (nets['cache'][i]['x'] - nets['cache'][i]['sample_mean']) / N + dmu / Ntemp = dx * tanh_gradient(nets['net_inputs'][i])grads['dw'+str(i)] = temp.T @ nets['net_outputs'][i-1]return gradsdef train_bn(self, data_loader, valid_loader, epochs, learning_rate):losses_train = []losses_valid = []for epoch in range(epochs):print("epoch", epoch)epoch_loss_train = 0# 重置全局均值和方差# 批量訓練for step, (x, y) in enumerate(data_loader):# x:[b, 28, 28] -> [b, 784] , y:[b, 1] -> [b, 10]x = x.reshape(-1, 28 * 28)y = onehot(y, 10)nets, pred = self.forward_bn(x, bn_mode='train')grads = self.backward_bn(nets, y, pred)self.optimizer.optimize(self.weight_num, self.params, grads, y.shape[0])loss = cross_entropy(y, pred)epoch_loss_train += lossif step % 100 == 0:print("epoch {} step {} loss {:.4f}".format(epoch, step, loss))losses_train.append(epoch_loss_train)data_loader.restart()print(epoch_loss_train)# 驗證集測試epoch_loss_valid = 0for step, (x, y) in enumerate(valid_loader):x = x.reshape(-1, 28 * 28)y = onehot(y, 10)nets, pred = self.forward_bn(x, bn_mode='test')loss = cross_entropy(y, pred)epoch_loss_valid += lossif step % 100 == 0:print("epoch {} step {} loss {:.4f}".format(epoch, step, loss))losses_valid.append(epoch_loss_valid)valid_loader.restart()his = {'train_loss': losses_train, 'valid_loss': losses_valid}return hisdef predict(self, data_loader, bn=False):labels = []pred = []losses = 0for (x, y) in data_loader:x = x.reshape(-1, 28 * 28)y = onehot(y, 10)if bn:_, out = self.forward_bn(x, 'test')else:_, out = self.forward(x)loss = cross_entropy(y, out)losses += lossout = list(np.argmax(out, axis=-1).flatten())y = list(np.argmax(y, axis=1).flatten())labels += ypred += outreturn np.array(pred).astype('int'), np.array(labels).astype('int')訓練效果
對比有無歸一化
主要用途
調整輸入層數值尺度,以便統一使用梯度下降優化時統一各層參數優化的學習率。(不然輸入層需要使用較低學習率)
使用效果
收斂更快 即損失下降更快。
訓練集
驗證集
測試集
對比有無Dropout
主要用途
隨機關閉神經元以減少神經元之間的相關度,從而逼迫神經網絡進行更加復雜的學習,有效抑制過擬合。
使用效果
訓練收斂變慢,測試效果變好。
訓練集
驗證集
測試集
對比有無Batch Normalization
主要用途
將各層網絡的輸入統一為標準正態分布,加快網絡的學習,有效解決訓練的相關問題。
使用效果
訓練加速。
訓練集
驗證集
測試集
補充說明
本案例均使用Numpy手寫神經網絡的訓練,如有疏漏之處歡迎之處。源碼開源于我的Github,歡迎star和fork。
超強干貨來襲 云風專訪:近40年碼齡,通宵達旦的技術人生總結
以上是生活随笔為你收集整理的Numpy实现BP神经网络(包含Dropout、BN等训练技巧)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 机器学习-Kmeans聚类
- 下一篇: 版本控制可视化工具-Gource教程