#Preparing Dataimport os
import random
import pandas as pd
import torch
from tqdm import tqdm# 以下幾個函數都是用于concat_feat拼接的
def load_feat(path):feat = torch.load(path)return featdef shift(x, n):if n <0:left = x[0].repeat(-n,1)right = x[:n]elif n >0:right = x[-1].repeat(n,1)left = x[n:]else:return xreturn torch.cat((left, right), dim=0)#一個phoneme 不會只有一個frame(幀) 訓練時接上前后的frame會得到較好的結果
#這里前后接對稱數量,例如concat_n =11 則前后都接5
def concat_feat(x, concat_n):assert concat_n %2==1 # n #為奇數if concat_n <2:return xseq_len, feature_dim = x.size(0), x.size(1)x = x.repeat(1, concat_n)x = x.view(seq_len, concat_n, feature_dim).permute(1,0,2) # concat_n, seq_len, feature_dimmid =(concat_n // 2)for r_idx in range(1, mid+1):x[mid + r_idx,:]=shift(x[mid + r_idx], r_idx)x[mid - r_idx,:]=shift(x[mid - r_idx],-r_idx)return x.permute(1,0,2).view(seq_len, concat_n * feature_dim)#x= torch.tensor([[1,2,3],
# [4,5,6],
# [7,8,9],
# [10,11,12]])#y=concat_feat(x ,3)#print(y)def preprocess_data(split, feat_dir, phone_path, concat_nframes, train_ratio=0.8, train_val_seed=1337):class_num =41 # NOTE: pre-computed, should not need changemode ='train'if(split =='train' or split =='val')else'test'label_dict ={}if mode !='test':phone_file =open(os.path.join(phone_path, f'{mode}_labels.txt')).readlines()#print(os.path.join(phone_path, f'{mode}_labels.txt'))for line in phone_file:line = line.strip('\n').split(' ')label_dict[line[0]]=[int(p)for p in line[1:]]if split =='train' or split =='val':#splittraining and validation datausage_list =open(os.path.join(phone_path,'train_split.txt')).readlines()random.seed(train_val_seed)random.shuffle(usage_list)percent =int(len(usage_list)* train_ratio)usage_list = usage_list[:percent]if split =='train'else usage_list[percent:]elif split =='test':usage_list =open(os.path.join(phone_path,'test_split.txt')).readlines()else:raise ValueError('Invalid \'split\' argument for dataset: PhoneDataset!')#得到每一個音頻代號usage_list =[line.strip('\n')for line in usage_list]print('[Dataset] - # phone classes: '+str(class_num)+', number of utterances for '+ split +': '+str(len(usage_list)))max_len =3000000X = torch.empty(max_len,39* concat_nframes)if mode !='test':y = torch.empty(max_len, dtype=torch.long)#將音頻數據讀取出來 X為特征 y為labelidx =0for i, fname in tqdm(enumerate(usage_list)):feat =load_feat(os.path.join(feat_dir, mode, f'{fname}.pt'))cur_len =len(feat)feat =concat_feat(feat, concat_nframes)if mode !='test':label = torch.LongTensor(label_dict[fname])X[idx: idx + cur_len,:]= featif mode !='test':y[idx: idx + cur_len]= labelidx += cur_lenX = X[:idx,:]if mode !='test':y = y[:idx]print(f'[INFO] {split} set')print(X.shape)if mode !='test':print(y.shape)return X, yelse:return X#Define Datasetimport torch
from torch.utils.data import Dataset
from torch.utils.data import DataLoaderclass LibriDataset(Dataset):def __init__(self, X, y=None):self.data = Xif y is not None:self.label = torch.LongTensor(y)else:self.label = Nonedef __getitem__(self, idx):if self.label is not None:return self.data[idx], self.label[idx]else:return self.data[idx]def __len__(self):returnlen(self.data)#Define Modelimport torch
import torch.nn as nn
import torch.nn.functional as Fclass BasicBlock(nn.Module):def __init__(self, input_dim, output_dim):super(BasicBlock, self).__init__()self.block = nn.Sequential(nn.Linear(input_dim, output_dim),nn.ReLU(),nn.BatchNorm1d(output_dim,eps=1e-05, momentum=0.1, affine=True),#num_features: 來自期望輸入的特征數,C from an expected input of size(N,C,L) or L from input of size(N,L)#eps: 為保證數值穩定性(分母不能趨近或取0),給分母加上的值。默認為1e-5。#momentum: 動態均值和動態方差所使用的動量。默認為0.1。#affine: 一個布爾值,當設為true,給該層添加可學習的仿射變換參數。nn.Dropout(0.35),)def forward(self, x):x = self.block(x)return xclass Classifier(nn.Module):def __init__(self, input_dim, output_dim=41, hidden_layers=1, hidden_dim=256):super(Classifier, self).__init__()self.fc = nn.Sequential(BasicBlock(input_dim, hidden_dim),*[BasicBlock(hidden_dim, hidden_dim)for _ in range(hidden_layers)],nn.Linear(hidden_dim, output_dim))def forward(self, x):x = self.fc(x)return x#超參數
## Hyper-parameters#dataprarameters
concat_nframes =19 # the number of frames to concat with, n must be odd(total 2k+1= n frames)
train_ratio =0.95 # the ratio of data used for training, the rest will be used for validation
# 百萬級數據集的訓練集驗證集劃分
# 一種常見的啟發式策略是將整體30%的數據用作測試集,這適用于總體數據量規模一般的情況
# (比如100至10,000個樣本)。但在大數據時期,分配比例會發生變化,
# 如100萬數據時,98%(訓練)1%(驗證)1%(測試),超百萬時,95%(訓練)/2.5%(驗證)2.5%(測試)
# -《Machine Learning Yearning》 Andrew Ng#trainingparameters
seed =0 # random seed
batch_size =1024 # batch size (512)
num_epoch =100 # the number of training epoch
learning_rate =0.0001 # learning rate
model_path ='./model.ckpt' # the path where the checkpoint will be saved#modelparameters
input_dim =39* concat_nframes # the input dim of the model, you should not change the value
hidden_layers =2 # the number of hidden layers
hidden_dim =1024 # the hidden dim#對垃圾進行回收所需調用的函數
## Prepare dataset and model
import gc#preprocessdata
train_X, train_y =preprocess_data(split='train', feat_dir='F:\kaggle\HW2\libriphone\libriphone\\feat', phone_path='F:\kaggle\HW2\libriphone\libriphone', concat_nframes=concat_nframes, train_ratio=train_ratio)
val_X, val_y =preprocess_data(split='val', feat_dir='F:\kaggle\HW2\libriphone\libriphone\\feat', phone_path='F:\kaggle\HW2\libriphone\libriphone', concat_nframes=concat_nframes, train_ratio=train_ratio)#getdataset
train_set =LibriDataset(train_X, train_y)
val_set =LibriDataset(val_X, val_y)#removeraw feature to save memory
del train_X, train_y, val_X, val_y
gc.collect()#getdataloader
train_loader =DataLoader(train_set, batch_size=batch_size, shuffle=True)
val_loader =DataLoader(val_set, batch_size=batch_size, shuffle=False)
device ='cuda:0'if torch.cuda.is_available()else'cpu'print(f'DEVICE: {device}')import numpy as np#fixseed
def same_seeds(seed):torch.manual_seed(seed)if torch.cuda.is_available():torch.cuda.manual_seed(seed)torch.cuda.manual_seed_all(seed)np.random.seed(seed)torch.backends.cudnn.benchmark = Falsetorch.backends.cudnn.deterministic = True#fixrandom seedsame_seeds(seed)#createmodel, define a loss function, and optimizer
model =Classifier(input_dim=input_dim, hidden_layers=hidden_layers, hidden_dim=hidden_dim).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.AdamW(model.parameters(), lr=learning_rate,weight_decay=0.01)
scheduler = torch.optim.lr_scheduler.CosineAnnealingWarmRestarts(optimizer,T_0=8,T_mult=2,eta_min = learning_rate/2)# #
#importtorchsummary#torchsummary.summary(model, input_size=(input_dim,))#TorchSummary提供了更詳細的信息分析,包括模塊信息(每一層的類型、輸出shape和參數量)
# 、模型整體的參數量、模型大小、一次前向或者反向傳播需要的內存大小等。
#ncol設置輸出寬度## Training
best_acc =0.0
early_stop_count =0
early_stopping =8for epoch in range(num_epoch):train_acc =0.0train_loss =0.0val_acc =0.0val_loss =0.0#trainingmodel.train() # set the model to training modepbar =tqdm(train_loader, ncols=110) #用于可視化進度pbar.set_description(f'T: {epoch + 1}/{num_epoch}')samples =0for i, batch in enumerate(pbar):features, labels = batchfeatures = features.to(device)labels = labels.to(device)optimizer.zero_grad()outputs =model(features)#optimizer.zero_grad()# 函數會遍歷模型的所有參數,,清空上一次的梯度記錄。loss =criterion(outputs, labels) #設定判別損失函數loss.backward() #執行反向傳播,更新梯度optimizer.step() #執行參數更新# 關于上述函數的講解 https://blog.csdn.net/PanYHHH/article/details/107361827?ops_request_misc=%257B%2522request%255Fid%2522%253A%2522166523672216782391838079%2522%252C%2522scm%2522%253A%252220140713.130102334..%2522%257D&request_id=166523672216782391838079&biz_id=0&utm_medium=distribute.pc_search_result.none-task-blog-2~all~top_positive~default-1-107361827-null-null.142^v52^control,201^v3^add_ask&utm_term=optimizer.step%28%29&spm=1018.2226.3001.4187_, train_pred = torch.max(outputs,1) # get the index of the class with the highest probabilitycorrect =(train_pred.detach()== labels.detach()).sum().item()#t.item()將Tensor變量轉換為python標量(intfloat等),其中t是一個Tensor變量,只能是標量,轉換后dtype與Tensor的dtype一致#detach該參數的requires_grad 屬性設置為False,這樣之后的反向傳播時就不會更新它train_acc += correctsamples += labels.size(0)train_loss += loss.item()lr = optimizer.param_groups[0]["lr"]# 可視化進度條的參數設置pbar.set_postfix({'lr': lr,'batch acc': correct / labels.size(0),'acc': train_acc / samples,'loss': train_loss /(i +1)})scheduler.step() #用于更新學習率# 各個情況下的 .step() 一般都是用來更新參數的pbar.close() #清空并關閉進度條 (progress bar)#validationiflen(val_set)>0:model.eval() # set the model to evaluation mode#用于將模型變為評估模式,而不是訓練模式,這樣batchNorm層,dropout層等用于優化訓練而添加的網絡層會被關閉,從而使得評估時不會發生偏移。with torch.no_grad():pbar =tqdm(val_loader, ncols=110)pbar.set_description(f'V: {epoch + 1}/{num_epoch}')samples =0for i, batch in enumerate(pbar):features, labels = batch #取出一個batch中的特征和標簽features = features.to(device)labels = labels.to(device)outputs =model(features) #得到預測結果loss =criterion(outputs, labels)_, val_pred = torch.max(outputs,1) # get the index of the class with the highest probability# 用于得到預測結果#torch.max(input: tensor, dim: index)# 該函數有兩個輸入:inputs: tensor,第一個參數為一個張量#dim: index,第二個參數為一個整數[-2-1],dim =0表示計算每列的最大值,dim =1表示每行的最大值val_acc +=(val_pred.cpu()== labels.cpu()).sum().item()samples += labels.size(0)val_loss += loss.item()pbar.set_postfix({'val acc': val_acc / samples,'val loss': val_loss /(i +1)})pbar.close()# 如果模型有進步(在訓練集上)就保存一個checkpoint,把模型保存下來if val_acc > best_acc:best_acc = val_acctorch.save(model.state_dict(), model_path)print('saving model with acc {:.3f}'.format(best_acc /len(val_set)))early_stop_count =0else:early_stop_count +=1if early_stop_count >= early_stopping:#print(f'')中的f使得其有print(''.format())的作用print(f"Epoch: {epoch + 1}, model not improving, early stopping.")breakelse:print('i dont know')#print(f'[{epoch +1:03d}/{num_epoch:03d}] Acc:{acc:3.6f} Loss:{loss:3.6f}')#print(f'[{epoch +1:03d}/{num_epoch:03d}] Acc:{acc:3.6f} Loss:{loss:3.6f}')# 如果沒有測試,保存最后一次訓練 我們是有測試集的,所以下述代碼用不著
#ifnot validating, save the last epochiflen(val_set)==0:torch.save(model.state_dict(), model_path)print('saving model at last epoch')
#老規矩,清除內存
del train_loader, val_loader
gc.collect()## Testing## 創造一個測試集用來得到題目想要的預測結果,我們從之前保存的checkpoint也就是最好的模型來預測結果
## Testing
#Create a testing dataset, and load model from the saved checkpoint.#Create a testing dataset, and load model from the saved checkpoint.#loaddata
test_X =preprocess_data(split='test', feat_dir='F:\kaggle\HW2\libriphone\libriphone\\feat', phone_path='F:\kaggle\HW2\libriphone\libriphone', concat_nframes=concat_nframes)
test_set =LibriDataset(test_X, None)
test_loader =DataLoader(test_set, batch_size=batch_size, shuffle=False)#loadmodel
model =Classifier(input_dim=input_dim, hidden_layers=hidden_layers, hidden_dim=hidden_dim).to(device)
model.load_state_dict(torch.load(model_path))#Make prediction.
test_acc =0.0
test_lengths =0
pred = np.array([], dtype=np.int32)model.eval()
with torch.no_grad():for i, batch in enumerate(tqdm(test_loader)):features = batchfeatures = features.to(device)outputs =model(features)_, test_pred = torch.max(outputs,1) # get the index of the class with the highest probabilitypred = np.concatenate((pred, test_pred.cpu().numpy()), axis=0)with open('prediction.csv','w') as f:f.write('Id,Class\n')for i, y in enumerate(pred):f.write('{},{}\n'.format(i, y))
class torch.nn.LSTM(*args,**kwargs):input_size:x的特征維度hidden_size:隱藏層的特征維度num_layers:lstm隱層的層數,默認為1bias:默認為Truebatch_first:True則輸入輸出的數據格式為 (batch, seq, feature)dropout:除最后一層,每一層的輸出都進行dropout,默認為:0bidirectional:True則為雙向lstm默認為False