pytorch--从零实现一个BERT模型
本文主要從代碼的角度一步步來講解一下BERT模型是如何實現的。(后附完整代碼)
關于BERT的理論解析現在隨便一搜就可以找到很多,在這里就不在贅述。
BERT源碼
- 模型架構
- 模型實現
- 定義超參
- 文本預處理
- make_batch()
- model
- Embedding
- get_attn_pad_mask
- layers
- MultiHeadAttention
- ScaledDotProductAttention
- PoswiseFeedForwardNet
- Loss
- 總結:
- 完整源碼
- 特別鳴謝:
模型架構
首先先說一下大佬傳授的技巧 😃
寫模型代碼要注意兩點:
1.從整體到局部;
2.數據流動形狀;
怎么理解呢?對于一個模型的搭建我們不可能一蹴而就,要先從整體入手,先把大框搭起來,然后在實現每個函數具體的功能。
對于“數據流動形狀”,要著重關注這個函數輸入輸出,比如經過Embedding層,原數據肯定會多一個維度,增加的這個維度后續我們會怎么處理。
先大概說一下我們這個模型要干什么事,先對bert這個模型有個大概一下印象。
a. 第一個字符 < cls>對應的輸出接一個linear層做一個二分類即NSP任務;
b.mask對應位置的輸出接一個解碼層 將768維的Embedding映射成詞表大小,然后與真實標簽做loss。
再來看一下4.中的bert都干了些什么?
首先將輸入文本做一個Embedding(①),然后送入多頭注意力機制中(②),輸出接一個Layer Normalization和殘差連接(③)最后送入兩個linear層中(④)
這就是上圖中一個Encoder做的事情。
到這大家對我們要干什么腦海里應該有了一個模糊的框架。
接下來就是代碼部分。完全按照上面描繪的走~
下面代碼的一個整體框架,其中make_batch ,model部分是重點
而model中的layers又是重重之中
模型實現
我們先從整體 即“main”入手:
定義超參
if __name__ == '__main__':# BERT Parametersmaxlen = 30 # 句子的最大長度batch_size = 6 # 每一組有多少個句子一起送進去模型max_pred = 5 # max tokens of predictionn_layers = 6 # number of Encoder of Encoder Layern_heads = 12 # number of heads in Multi-Head Attentiond_model = 768 # Embedding Sized_ff = 3072 # 4*d_model, FeedForward dimensiond_k = d_v = 64 # dimension of K(=Q), Vn_segments = 2 # 用于NSP任務首先是定義一些超參,具體作用均已標注。
其中需要注意的一個參數是:max_pred 它表示的是一個句子中最多可以有多少個mask,怎么用后面我們會談到(一個坑,在make_batch()部分會講到)
文本預處理
因為我們注重的是模型的實現,所以數據部分就自己定義了一些對話語句,在實際應用中往往會是海量的文本。
text = ('Hello, how are you? I am Romeo.\n''Hello, Romeo My name is Juliet. Nice to meet you.\n''Nice meet you too. How are you today?\n''Great. My baseball team won the competition.\n''Oh Congratulations, Juliet\n''Thanks you Romeo')數據有了,接下來就是對數據進行處理
sentences = re.sub("[.,!?\\-]", '', text.lower()).split('\n') # filter '.', ',', '?', '!'通過re.sub函數將數據中的特殊字符清除掉 并將大寫字符全部轉變為小寫字符
效果:
word_list:根據“ ”空格切分后的單詞列表
word_dict: 加入特殊字符生成的詞典
number_dict:將word_dict的鍵值對調換(預測時候會用到)
效果:
根據剛剛生成好的word_dict 將sentences 中的字符轉換成數字 方便后面處理
效果:
文本預處理完~~~~~~
繼續往下看就是最重要的數據構建部分了
make_batch()
batch = make_batch() # 最重要的一部分 預訓練任務的數據構建部分input_ids, segment_ids, masked_tokens, masked_pos, isNext = map(torch.LongTensor, zip(*batch))# map把函數依次作用在list中的每一個元素上,得到一個新的list并返回。注意,map不改變原list,而是返回一個新list。通過 make_batch() 對數據進行處理,
得到 input_ids, segment_ids, masked_tokens, masked_pos, isNext
我們跳到make_batch()函數部分看一下這幾個代表的是什么,和它具體是怎么處理的。
實現的效果:
input_ids 是下圖中的Token Embeddings
segment_ids就是下圖中的Segment Embeddings
接下來是要對剛剛拼接好的input_ids進行mask處理:
# MASK LMn_pred = min(max_pred, max(1, int(round(len(input_ids) * 0.15)))) # n_pred=3;整個句子的15%的字符可以被mask掉,這里取和max_pred中的最小值,確保每次計算損失的時候沒有那么多字符以及信息充足,有15%做控制就夠了;其實可以不用加這個,單個句子少了,就要加上足夠的訓練樣本# 不讓特殊字符參與maskcand_maked_pos = [i for i, token in enumerate(input_ids)if token != word_dict['[CLS]'] and token != word_dict['[SEP]']] ## cand_maked_pos=[1, 2, 3, 4, 5, 6, 7, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18];整個句子input_ids中可以被mask的符號必須是非cls和sep符號的,要不然沒意義shuffle(cand_maked_pos)## 打亂順序:cand_maked_pos=[6, 5, 17, 3, 1, 13, 16, 10, 12, 2, 9, 7, 11, 18, 4, 14, 15] 其實取mask對應的位置有很多方法,這里只是一種使用shuffle的方式在這部分就要用到我們最開始提到的那個值得注意的超參:max_pred(填坑)
為什么需要max_pred?
比如在mask時候,一個句子被mask了3個單詞,另一個句子被mask了7個單詞。
很難把這兩個句子組成一個有效的矩陣。我們之前做了一個最大長度的截斷,這max_pred也相當一個截斷參數。
cand_maked_pos的作用是去掉特殊字符< CLS > < SEP>,整個句子input_ids中可以被mask的符號必須是非cls和sep符號的,要不然沒意義
masked_tokens對應的是被mask元素之前的原始的單字數字,
masked_pos 對應的是position信息
然后對其按照8/1/1比例mask
想要實現的效果:
接下來是補零操作:
為什么要補零呢?
是為了計算一個batch中句子的mlm損失的時候可以組成一個有效矩陣放進去;不然第一個句子預測5個字符,第二句子預測7個字符,第三個句子預測8個字符,組不成一個有效的矩陣;
還有一個點,為什么補的是零,而不是其他值?
在后面的loss部分會給出解釋。
通過 batch.append添加的字段就是我們要得到 input_ids, segment_ids, masked_tokens, masked_pos, isNext
input_ids 是bert輸入的Token Embeddings
segment_idsbert輸入的Segment Embeddings
masked_tokens對應的是被mask元素之前的原始的單字數字,
masked_pos 對應的是position信息
isNext 代表這兩個句子是否是相鄰的上下文
make_batch()完~~~~~~
回到main()繼續往下看
定義模型,損失函數 和 優化策略
model = BERT()criterion = nn.CrossEntropyLoss(ignore_index=0) # 只計算mask位置的損失optimizer = optim.Adam(model.parameters(), lr=0.001)這里有一個細節是nn.CrossEntropyLoss(ignore_index=0) 我們定義了一個ignore_index為0,
我們可以看到loss 中給出的解釋的我們可以指定一個值,這個值不參與計算。也就是說我們后面在計算loss的時候,0不參與計算,即我們對masked_tokens補零后不影響結果。
接下來是main()中的調用部分
for epoch in range(100):optimizer.zero_grad()# logits_lm 語言詞表的輸出# logits_clsf 二分類的輸出# logits_lm:[batch_size, max_pred, n_vocab]logits_lm, logits_clsf = model(input_ids, segment_ids, masked_pos)## logits_lm 【6,5,29】 bs*max_pred*voca logits_clsf:[6*2]model
我們傳給model 三個參數input_ids, segment_ids, masked_pos,分別為
input_ids :bert輸入的Token Embeddings
segment_ids :bert輸入的Segment Embeddings
masked_pos :對應的是選中那15%的position信息
我們剛剛在make_baatch已經說的很清楚了,還是不太明白的可以在回去看一下~
我們來看一下model = BERT()的詳細處理過程:
首先是一些定義
其中值得關注的是self.embedding,self.layers 這也是我們要重點講的
下面是BERT的實現部分
def forward(self, input_ids, segment_ids, masked_pos):# 將input_ids,segment_ids,pos_embed加和得到inputinput = self.embedding(input_ids, segment_ids)我們將input_ids, segment_ids傳給Embedding,那Embedding會進行什么操作呢?
Embedding
class Embedding(nn.Module):def __init__(self):super(Embedding, self).__init__()self.tok_embed = nn.Embedding(vocab_size, d_model) # token embeddingself.pos_embed = nn.Embedding(maxlen, d_model) # position embeddingself.seg_embed = nn.Embedding(n_segments, d_model) # segment(token type) embedding# self.norm = nn.LayerNorm(d_model)def forward(self, input_ids, segment_ids)seq_len = input_ids.size(1) # input_ids:batch_size x len x d_modelpos = torch.arange(seq_len, dtype=torch.long) #生成數組pos = pos.unsqueeze(0).expand_as(input_ids) # (seq_len,) -> (batch_size, seq_len)embedding = self.tok_embed(input_ids) + self.pos_embed(pos) + self.seg_embed(segment_ids)return embedding首先是在定義部分( init )定義了三個映射規則
然后在實現部分( forward )通過arange生成一個與input_ids 維度一致的數組(已填充好的),然后將傳入的input_ids, segment_ids和生成的pos 相加得到bert的最終輸入,即下圖中的input。
arange函數的效果:
Embedding完~~~~~~
回到model部分我們繼續往下看
是一個get_attn_pad_mask函數,它的作用是為了得到句子中pad的位置信息,給到模型后面,在計算自注意力和交互注意力的時候去掉pad符號的影響。
##get_attn_pad_mask是為了得到句子中pad的位置信息,給到模型后面,在計算自注意力和交互注意力的時候去掉pad符號的影響enc_self_attn_pad = get_attn_pad_mask(input_ids, input_ids)下面我們來具體看一下這個函數
get_attn_pad_mask
def get_attn_pad_mask(seq_q, seq_k): # 在自注意力層q k是一致的batch_size, len_q = seq_q.size()batch_size, len_k = seq_k.size()# eq(zero) is PAD token# eq(0)表示和0相等的返回True,不相等返回False。pad_attn_mask = seq_k.data.eq(0).unsqueeze(1) # batch_size x 1 x len_k(=len_q), one is maskingreturn pad_attn_mask.expand(batch_size, len_q, len_k) # 重復了len_q次 batch_size x len_q x len_k 不懂可以看一下例子內容不多也很好懂,seq_k.data.eq(0)是將input_ids中=0的置為True 其他置為False,.unsqueeze(1)的作用是增加一維,然后通過.expand函數重復 len_q次 ,最終會return我們想要的 符號矩陣。
.expand函數:
get_attn_pad_mask完~~~~~~
回到model部分我們繼續往下看
layers
for layer in self.layers:output, enc_self_attn = layer(input, enc_self_attn_pad) ## enc_self_attn這里是QK轉置相乘之后softmax之后的矩陣值,代表的是每個單詞和其他單詞相關性;# output : [batch_size, len, d_model], attn : [batch_size, n_heads, d_mode, d_model]對于layers是我們在最開始就提到它是整個模型的重中之中,因為我們要在layer部分實現最重要的多頭注意力機制和 pos_ffn
對于多頭注意力機制我們的輸入有四個enc_inputs, enc_inputs, enc_inputs, enc_self_attn_pad 分別代表Q K V 和我們之前求出的符號矩陣。
MultiHeadAttention
lass MultiHeadAttention(nn.Module):def __init__(self):super(MultiHeadAttention, self).__init__()## 輸入進來的QKV是相等的,使用映射linear做一個映射得到參數矩陣Wq, Wk,Wvself.W_Q = nn.Linear(d_model, d_k * n_heads)self.W_K = nn.Linear(d_model, d_k * n_heads)self.W_V = nn.Linear(d_model, d_v * n_heads)def forward(self, Q, K, V, attn_pad):## 這個多頭分為這幾個步驟,首先映射分頭,然后計算atten_scores,然后計算atten_value;## 輸入進來的數據形狀: Q: [batch_size x len_q x d_model], K: [batch_size x len_k x d_model], V: [batch_size x len_k x d_model]# q: [batch_size x len_q x d_model], k: [batch_size x len_k x d_model], v: [batch_size x len_k x d_model]residual, batch_size = Q, Q.size(0)# (B, S, D) -proj-> (B, S, D) -split-> (B, S, H, W) -trans-> (B, H, S, W)##下面這個就是先映射,后分頭;一定要注意的是q和k分頭之后維度是一致額,所以這里都是dkq_s = self.W_Q(Q).view(batch_size, -1, n_heads, d_k).transpose(1,2) # q_s: [batch_size x n_heads x len_q x d_k]k_s = self.W_K(K).view(batch_size, -1, n_heads, d_k).transpose(1,2) # k_s: [batch_size x n_heads x len_k x d_k]v_s = self.W_V(V).view(batch_size, -1, n_heads, d_v).transpose(1,2) # v_s: [batch_size x n_heads x len_k x d_v]## 輸入進行的attn_pad形狀是 batch_size x len_q x len_k,然后經過下面這個代碼得到 新的attn_pad : [batch_size x n_heads x len_q x len_k],就是把pad信息重復了n個頭上attn_pad = attn_pad.unsqueeze(1).repeat(1, n_heads, 1, 1) # repeat 對張量重復擴充首先使用映射linear做一個映射得到參數矩陣Wq, Wk,Wv 注意這里的參數矩陣的維度是d_k * n_heads 是‘多頭’之后的
q_s = self.W_Q(Q).view(batch_size, -1, n_heads, d_k).transpose(1,2)
的意思是先通過映射得到參數矩陣,在通過.view將其“分頭”并調整維度順序
.view函數效果:
然后通過打分函數得到注意力矩陣context, 注意力分數attn(沒乘V之前的矩陣)
ScaledDotProductAttention部分要實現的就是點積注意力計算公式:
ScaledDotProductAttention
點積注意力計算公式:
其中一個值的注意的點是我們通過scores.masked_fill_(attn_pad, -1e9) 將符號矩陣對應的位置 置為無窮小,這樣經過softmax后它就不會對q的單詞起作用。也就實現了我們想要的去除掉pad對其他單詞影響的效果。
MultiHeadAttention完~~~~~~
回到layers我們繼續看
將剛剛通過多頭注意力機制得到的enc_outputs 送入PoswiseFeedForwardNet ,得到layers最終的結果
PoswiseFeedForwardNet
class PoswiseFeedForwardNet(nn.Module):def __init__(self): # 對每個字的增強語義向量再做兩次線性變換,以增強整個模型的表達能力。super(PoswiseFeedForwardNet, self).__init__()self.fc1 = nn.Linear(d_model, d_ff)self.fc2 = nn.Linear(d_ff, d_model)def forward(self, x):# (batch_size, len_seq, d_model) -> (batch_size, len_seq, d_ff) -> (batch_size, len_seq, d_model)return self.fc2(gelu(self.fc1(x)))這部分就是一個兩層的linear層,沒什么好說的,對每個字的增強語義向量再做兩次線性變換,以增強整個模型的表達能力。這里,變換后的向量與原向量保持長度相同。
pos_ffn完~~~~~~
layers 完~~~~~~
回到model部分我們繼續往下看
h_pooled = self.activ1(self.fc(output[:, 0])) # [batch_size, d_model] cls 對應的位置 可以看一下例子logits_clsf = self.classifier(h_pooled) # [batch_size, 2]masked_pos = masked_pos[:, :, None].expand(-1, -1, output.size(-1)) # [batch_size, max_pred, d_model] 其中一個 masked_pos= [6, 5, 17,0,0]# get masked position from final output of transformer.h_masked = torch.gather(output, 1, masked_pos) #在output取出一維對應masked_pos數據 masking position [batch_size, max_pred, d_model]h_masked = self.norm(self.activ2(self.linear(h_masked)))logits_lm = self.decoder(h_masked) + self.decoder_bias # [batch_size, max_pred, n_vocab]return logits_lm, logits_clsf通過output[:, 0]切片的方式得到cls對應位置的信息 將其送入一個linear層得到h_pooled 再將其送入一個二分類的linear 得到nsp任務的結果:logits_clsf
通過torch.gather 在output中取出一維masked_pos 對應的數據 h_masked 再將其送入linear層 并解碼(decoder)得到mlm任務的結果: logits_lm
model 完~~~~~~
model的返回值
logits_lm, 代表 mask對應位置的輸出
logits_clsf,為nsp任務的輸出。
回到main()部分我們繼續往下看
Loss
loss_lm = criterion(logits_lm.transpose(1, 2), masked_tokens) # for masked LM ;masked_tokens [6,5]loss_lm = (loss_lm.float()).mean()loss_clsf = criterion(logits_clsf, isNext) # for sentence classificationloss = loss_lm + loss_clsfif (epoch + 1) % 10 == 0:print('Epoch:', '%04d' % (epoch + 1), 'cost =', '{:.6f}'.format(loss))loss.backward()optimizer.step()BERT 的損失函數由兩部分組成,第一部分是來自 Mask-LM 的「單詞級別分類任務」,另一部分是「句子級別的分類任務」。通過這兩個任務的聯合學習,可以使得 BERT 學習到的表征既有 token 級別信息,同時也包含了句子級別的語義信息。
最后再來看一下我們一開始提到的模型框架圖,是不是很輕松就能理解了 😃
總結:
BERT文章作者提出了兩個預訓練任務:Masked LM和Next Sentence Prediction
BERT的第一個任務是采用 MaskLM 的方式來訓練語言模型,通俗地說就是在輸入一句話的時
候,隨機地選一些要預測的詞,然后用一個特殊的符號[MASK]來代替它們,因為我們知道被蓋起來的部分是什么,但BERT不知道,所以BERT學習的目標就是 :輸出跟蓋起來的越接近越好。思想來源于 「完形填空」 的 任 務 。 具體來說 , 文章作者在一句話中隨機選擇 15% 的 詞匯用于預 測 。 對于在原句中被 抹 去 的 詞 匯 :
80% 情況下采用 一 個特殊符號 [MASK] 替 換 ,
10% 情況下采用 一 個任意詞替換,
剩余 10% 情況下保持原詞匯不變
這樣做的好處是,BERT 并不知道[MASK]替換的是這 15%個 Token 中的哪一個詞(「注意:這里意思是輸入的時候不知道[MASK] 替換的是哪一個詞,但是輸出還是知道要預測哪個詞的」)
Next Sentence Prediction 的任務描述為:給定一篇文章中的兩句話,判斷第二句話在文本中是否緊跟在第一句話之后。
這個類似于 「段落重排序」 的任務
只考慮兩句話,判斷是否是一篇文章中的前后句。在實際預訓練過程中,
文章作者從文本語料庫中隨機選擇 50% 正確語句對和 50% 錯誤語句對進行訓練,在第一個句子的首部會加上一個[CLS] token,在兩個句子中間以及最后一個句子的尾部會加上一個[SEP] token。
這樣能讓模型去學習一下句子層面的信息。
本文完~~~~~~
完整源碼
""" orginal from : https://github.com/graykode/nlp-tutorial/tree/master/5-2.BERT """ import math import re from random import * import numpy as np import torch import torch.nn as nn import torch.optim as optim# 數據預處理 def make_batch():batch = [] # listpositive = negative = 0 # 計數器 為了記錄NSP任務中的正樣本和負樣本的個數,比例最好是在一個batch中接近1:1while positive != batch_size/2 or negative != batch_size/2:# 抽出來兩句話 先隨機sample兩個index 再通過index找出樣本tokens_a_index, tokens_b_index= randrange(len(sentences)), randrange(len(sentences)) # 比如tokens_a_index=3,tokens_b_index=1;從整個樣本中抽取對應的樣本;tokens_a, tokens_b= token_list[tokens_a_index], token_list[tokens_b_index]## 根據索引獲取對應樣本:tokens_a=[5, 23, 26, 20, 9, 13, 18] tokens_b=[27, 11, 23, 8, 17, 28, 12, 22, 16, 25]# 拼接input_ids = [word_dict['[CLS]']] + tokens_a + [word_dict['[SEP]']] + tokens_b + [word_dict['[SEP]']] ## 加上特殊符號,CLS符號是1,sep符號是2:[1, 5, 23, 26, 20, 9, 13, 18, 2, 27, 11, 23, 8, 17, 28, 12, 22, 16, 25, 2]segment_ids = [0] * (1 + len(tokens_a) + 1) + [1] * (len(tokens_b) + 1)##分割句子符號:[0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]# MASK LMn_pred = min(max_pred, max(1, int(round(len(input_ids) * 0.15)))) # n_pred=3;整個句子的15%的字符可以被mask掉,這里取和max_pred中的最小值,確保每次計算損失的時候沒有那么多字符以及信息充足,有15%做控制就夠了;其實可以不用加這個,單個句子少了,就要加上足夠的訓練樣本# 不讓特殊字符參與maskcand_maked_pos = [i for i, token in enumerate(input_ids)if token != word_dict['[CLS]'] and token != word_dict['[SEP]']] ## cand_maked_pos=[1, 2, 3, 4, 5, 6, 7, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18];整個句子input_ids中可以被mask的符號必須是非cls和sep符號的,要不然沒意義shuffle(cand_maked_pos)## 打亂順序:cand_maked_pos=[6, 5, 17, 3, 1, 13, 16, 10, 12, 2, 9, 7, 11, 18, 4, 14, 15] 其實取mask對應的位置有很多方法,這里只是一種使用shuffle的方式masked_tokens, masked_pos = [], []for pos in cand_maked_pos[:n_pred]: # 取其中的三個;masked_pos=[6, 5, 17] 注意這里對應的是position信息;masked_tokens=[13, 9, 16] 注意這里是被mask的元素之前對應的原始單字數字;masked_pos.append(pos)masked_tokens.append(input_ids[pos]) # 回到ppt看一下if random() < 0.8: # 80%input_ids[pos] = word_dict['[MASK]'] # make maskelif random() < 0.5: # 10%index = randint(0, vocab_size - 1) # random index in vocabularyinput_ids[pos] = word_dict[number_dict[index]] # replace# Zero Paddingsn_pad = maxlen - len(input_ids)##maxlen=30;n_pad=10input_ids.extend([0] * n_pad)segment_ids.extend([0] * n_pad)# 這里有一個問題,0和之前的重了# Zero Padding (100% - 15%) tokens 是為了計算一個batch中句子的mlm損失的時候可以組成一個有效矩陣放進去;不然第一個句子預測5個字符,第二句子預測7個字符,第三個句子預測8個字符,組不成一個有效的矩陣;## 這里非常重要,為什么是對masked_tokens是補零,而不是補其他的字符????我補1可不可以??if max_pred > n_pred:n_pad = max_pred - n_predmasked_tokens.extend([0] * n_pad)## masked_tokens= [13, 9, 16, 0, 0] masked_tokens 對應的是被mask的元素的原始真實標簽是啥,也就是groundtruthmasked_pos.extend([0] * n_pad)## masked_pos= [6, 5, 17,0,0] masked_pos是記錄哪些位置被mask了if tokens_a_index + 1 == tokens_b_index and positive < batch_size/2:batch.append([input_ids, segment_ids, masked_tokens, masked_pos, True]) # IsNextpositive += 1elif tokens_a_index + 1 != tokens_b_index and negative < batch_size/2:batch.append([input_ids, segment_ids, masked_tokens, masked_pos, False]) # NotNextnegative += 1return batch # 符號矩陣 def get_attn_pad_mask(seq_q, seq_k): # 在自注意力層q k是一致的batch_size, len_q = seq_q.size()batch_size, len_k = seq_k.size()# eq(zero) is PAD token# eq(0)表示和0相等的返回True,不相等返回False。pad_attn_mask = seq_k.data.eq(0).unsqueeze(1) # batch_size x 1 x len_k(=len_q), one is maskingreturn pad_attn_mask.expand(batch_size, len_q, len_k) # 重復了len_q次 batch_size x len_q x len_k 不懂可以看一下例子 def gelu(x):"Implementation of the gelu activation function by Hugging Face"return x * 0.5 * (1.0 + torch.erf(x / math.sqrt(2.0)))#Embedding層 class Embedding(nn.Module):def __init__(self):super(Embedding, self).__init__()self.tok_embed = nn.Embedding(vocab_size, d_model) # token embeddingself.pos_embed = nn.Embedding(maxlen, d_model) # position embeddingself.seg_embed = nn.Embedding(n_segments, d_model) # segment(token type) embeddingself.norm = nn.LayerNorm(d_model)def forward(self, input_ids, segment_ids):# x對應input_ids, seg對應segment_idsseq_len = input_ids.size(1)pos = torch.arange(seq_len, dtype=torch.long)pos = pos.unsqueeze(0).expand_as(input_ids) # (seq_len,) -> (batch_size, seq_len)embedding = self.tok_embed(input_ids) + self.pos_embed(pos) + self.seg_embed(segment_ids)return self.norm(embedding)# 注意力打分函數 class ScaledDotProductAttention(nn.Module):def __init__(self):super(ScaledDotProductAttention, self).__init__()def forward(self, Q, K, V, attn_pad):## 輸入進來的維度分別是 [batch_size x n_heads x len_q x d_k] K: [batch_size x n_heads x len_k x d_k] V: [batch_size x n_heads x len_k x d_v]##首先經過matmul函數得到的scores形狀是 : [batch_size x n_heads x len_q x len_k]scores = torch.matmul(Q, K.transpose(-1, -2)) / np.sqrt(d_k) # scores : [batch_size x n_heads x len_q(=len_k) x len_k(=len_q)]## 然后關鍵詞地方來了,下面這個就是用到了我們之前重點講的attn_pad,把被pad的地方置為無限小,softmax之后基本就是0,對q的單詞不起作用scores.masked_fill_(attn_pad, -1e9) # Fills elements of self tensor with value where mask is one.attn = nn.Softmax(dim=-1)(scores)context = torch.matmul(attn, V)return context, attn#多頭注意力機制 class MultiHeadAttention(nn.Module):def __init__(self):super(MultiHeadAttention, self).__init__()## 輸入進來的QKV是相等的,使用映射linear做一個映射得到參數矩陣Wq, Wk,Wvself.W_Q = nn.Linear(d_model, d_k * n_heads)self.W_K = nn.Linear(d_model, d_k * n_heads)self.W_V = nn.Linear(d_model, d_v * n_heads)def forward(self, Q, K, V, attn_pad):## 這個多頭分為這幾個步驟,首先映射分頭,然后計算atten_scores,然后計算atten_value;## 輸入進來的數據形狀: Q: [batch_size x len_q x d_model], K: [batch_size x len_k x d_model], V: [batch_size x len_k x d_model]# q: [batch_size x len_q x d_model], k: [batch_size x len_k x d_model], v: [batch_size x len_k x d_model]residual, batch_size = Q, Q.size(0)# (B, S, D) -proj-> (B, S, D) -split-> (B, S, H, W) -trans-> (B, H, S, W)##下面這個就是先映射,后分頭;一定要注意的是q和k分頭之后維度是一致額,所以這里都是dkq_s = self.W_Q(Q).view(batch_size, -1, n_heads, d_k).transpose(1,2) # q_s: [batch_size x n_heads x len_q x d_k]k_s = self.W_K(K).view(batch_size, -1, n_heads, d_k).transpose(1,2) # k_s: [batch_size x n_heads x len_k x d_k]v_s = self.W_V(V).view(batch_size, -1, n_heads, d_v).transpose(1,2) # v_s: [batch_size x n_heads x len_k x d_v]## 輸入進行的attn_pad形狀是 batch_size x len_q x len_k,然后經過下面這個代碼得到 新的attn_pad : [batch_size x n_heads x len_q x len_k],就是把pad信息重復了n個頭上attn_pad = attn_pad.unsqueeze(1).repeat(1, n_heads, 1, 1) # repeat 對張量重復擴充# context: [batch_size x n_heads x len_q x d_v], attn: [batch_size x n_heads x len_q(=len_k) x len_k(=len_q)]context, attn = ScaledDotProductAttention()(q_s, k_s, v_s, attn_pad)context = context.transpose(1, 2).contiguous().view(batch_size, -1, n_heads * d_v) # context: [batch_size x len_q x n_heads * d_v]output = nn.Linear(n_heads * d_v, d_model)(context)return nn.LayerNorm(d_model)(output + residual), attn # output: [batch_size x len_q x d_model]#基于位置的前饋神經網絡 class PoswiseFeedForwardNet(nn.Module):def __init__(self): # 對每個字的增強語義向量再做兩次線性變換,以增強整個模型的表達能力。super(PoswiseFeedForwardNet, self).__init__()self.fc1 = nn.Linear(d_model, d_ff)self.fc2 = nn.Linear(d_ff, d_model)def forward(self, x):# (batch_size, len_seq, d_model) -> (batch_size, len_seq, d_ff) -> (batch_size, len_seq, d_model)return self.fc2(gelu(self.fc1(x)))#Encoder class EncoderLayer(nn.Module):def __init__(self):super(EncoderLayer, self).__init__()self.enc_self_attn = MultiHeadAttention()self.pos_ffn = PoswiseFeedForwardNet()def forward(self, enc_inputs, enc_self_attn_pad):enc_outputs, attn = self.enc_self_attn(enc_inputs, enc_inputs, enc_inputs, enc_self_attn_pad) # enc_inputs to same Q,K,V enc_self_attn_mask是pad符號矩陣enc_outputs = self.pos_ffn(enc_outputs) # enc_outputs: [batch_size x len_q x d_model]return enc_outputs, attn## 1. BERT模型整體架構 class BERT(nn.Module):def __init__(self):super(BERT, self).__init__()self.embedding = Embedding() ## 詞向量層,構建詞表矩陣self.layers = nn.ModuleList([EncoderLayer() for _ in range(n_layers)]) ## 把N個encoder堆疊起來,具體encoder實現一會看self.fc = nn.Linear(d_model, d_model) ## 前饋神經網絡-clsself.activ1 = nn.Tanh() ## 激活函數-clsself.linear = nn.Linear(d_model, d_model)#-mlmself.activ2 = gelu ## 激活函數--mlmself.norm = nn.LayerNorm(d_model)self.classifier = nn.Linear(d_model, 2)## cls 這是一個分類層,維度是從d_model到2,對應我們架構圖中就是這種:# decoder is shared with embedding layerembed_weight = self.embedding.tok_embed.weightn_vocab, n_dim = embed_weight.size()self.decoder = nn.Linear(n_dim, n_vocab, bias=False)self.decoder.weight = embed_weightself.decoder_bias = nn.Parameter(torch.zeros(n_vocab))def forward(self, input_ids, segment_ids, masked_pos):input = self.embedding(input_ids, segment_ids) # 將input_ids,segment_ids,pos_embed加和##get_attn_pad_mask是為了得到句子中pad的位置信息,給到模型后面,在計算自注意力和交互注意力的時候去掉pad符號的影響,去看一下這個函數 4.enc_self_attn_pad = get_attn_pad_mask(input_ids, input_ids)for layer in self.layers:output, enc_self_attn = layer(input, enc_self_attn_pad) ## enc_self_attn這里是QK轉置相乘之后softmax之后的矩陣值,代表的是每個單詞和其他單詞相關性;# output : [batch_size, len, d_model], attn : [batch_size, n_heads, d_mode, d_model]h_pooled = self.activ1(self.fc(output[:, 0])) # [batch_size, d_model] cls 對應的位置 可以看一下例子logits_clsf = self.classifier(h_pooled) # [batch_size, 2]masked_pos = masked_pos[:, :, None].expand(-1, -1, output.size(-1)) # [batch_size, max_pred, d_model] 其中一個 masked_pos= [6, 5, 17,0,0]# get masked position from final output of transformer.h_masked = torch.gather(output, 1, masked_pos) #在output取出一維對應masked_pos數據 masking position [batch_size, max_pred, d_model]h_masked = self.norm(self.activ2(self.linear(h_masked)))logits_lm = self.decoder(h_masked) + self.decoder_bias # [batch_size, max_pred, n_vocab]return logits_lm, logits_clsf # 1.從整體到局部 # 2.數據流動形狀(輸入 輸出) if __name__ == '__main__':# BERT Parametersmaxlen = 30 # 句子的最大長度batch_size = 6 # 每一組有多少個句子一起送進去模型max_pred = 5 # max tokens of predictionn_layers = 6 # number of Encoder of Encoder Layern_heads = 12 # number of heads in Multi-Head Attentiond_model = 768 # Embedding Sized_ff = 3072 # 4*d_model, FeedForward dimensiond_k = d_v = 64 # dimension of K(=Q), Vn_segments = 2text = ('Hello, how are you? I am Romeo.\n''Hello, Romeo My name is Juliet. Nice to meet you.\n''Nice meet you too. How are you today?\n''Great. My baseball team won the competition.\n''Oh Congratulations, Juliet\n''Thanks you Romeo')sentences = re.sub("[.,!?\\-]", '', text.lower()).split('\n') # filter '.', ',', '?', '!'word_list = list(set(" ".join(sentences).split()))word_dict = {'[PAD]': 0, '[CLS]': 1, '[SEP]': 2, '[MASK]': 3}for i, w in enumerate(word_list):word_dict[w] = i + 4number_dict = {i: w for i, w in enumerate(word_dict)}vocab_size = len(word_dict)# 把文本轉化成數字token_list = list()for sentence in sentences:arr = [word_dict[s] for s in sentence.split()]token_list.append(arr)batch = make_batch() # 最重要的一部分 預訓練任務的數據構建部分input_ids, segment_ids, masked_tokens, masked_pos, isNext = map(torch.LongTensor, zip(*batch))# map把函數依次作用在list中的每一個元素上,得到一個新的list并返回。注意,map不改變原list,而是返回一個新list。model = BERT()criterion = nn.CrossEntropyLoss(ignore_index=0) # 只計算mask位置的損失optimizer = optim.Adam(model.parameters(), lr=0.001)for epoch in range(100):optimizer.zero_grad()# logits_lm 語言詞表的輸出# logits_clsf 二分類的輸出# logits_lm:[batch_size, max_pred, n_vocab]logits_lm, logits_clsf = model(input_ids, segment_ids, masked_pos)## logits_lm 【6,5,29】 bs*max_pred*voca logits_clsf:[6*2]loss_lm = criterion(logits_lm.transpose(1, 2), masked_tokens) # for masked LM ;masked_tokens [6,5]loss_lm = (loss_lm.float()).mean()loss_clsf = criterion(logits_clsf, isNext) # for sentence classificationloss = loss_lm + loss_clsfif (epoch + 1) % 10 == 0:print('Epoch:', '%04d' % (epoch + 1), 'cost =', '{:.6f}'.format(loss))loss.backward()optimizer.step()特別鳴謝:
DASOU
總結
以上是生活随笔為你收集整理的pytorch--从零实现一个BERT模型的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: LibFuzzer学习(一):轻松找到心
- 下一篇: CCCC 天梯赛 PTA ZOJ 题目