基于PaddlePaddle实现声音分类
前言
本章我們來介紹如何使用PaddlePaddle訓練一個區分不同音頻的分類模型,例如你有這樣一個需求,需要根據不同的鳥叫聲識別是什么種類的鳥,這時你就可以使用這個方法來實現你的需求了。
源碼地址:https://github.com/yeyupiaoling/AudioClassification_PaddlePaddle
環境準備
主要介紹libsora,PyAudio,pydub的安裝,其他的依賴包根據需要自行安裝。
- Python 3.7
- PaddlePaddle 2.2.2
安裝libsora
最簡單的方式就是使用pip命令安裝,如下:
pip install pytest-runner pip install librosa==0.9.1注意: 如果pip命令安裝不成功,那就使用源碼安裝,下載源碼:https://github.com/librosa/librosa/releases/, windows的可以下載zip壓縮包,方便解壓。
pip install pytest-runner tar xzf librosa-<版本號>.tar.gz 或者 unzip librosa-<版本號>.tar.gz cd librosa-<版本號>/ python setup.py install安裝ffmpeg, 下載地址:http://blog.gregzaal.com/how-to-install-ffmpeg-on-windows/,筆者下載的是64位,static版。
然后到C盤,筆者解壓,修改文件名為 ffmpeg,存放在 C:\Program Files\目錄下,并添加環境變量 C:\Program Files\ffmpeg\bin
最后修改源碼,路徑為 C:\Python3.7\Lib\site-packages\audioread\ffdec.py,修改32行代碼,如下:
COMMANDS = ('C:\\Program Files\\ffmpeg\\bin\\ffmpeg.exe', 'avconv')安裝PyAudio
使用pip安裝命令,如下:
pip install pyaudio在安裝的時候需要使用到C++庫進行編譯,如果讀者的系統是windows,Python是3.7,可以在這里下載whl安裝包,下載地址:https://github.com/intxcc/pyaudio_portaudio/releases
安裝pydub
使用pip命令安裝,如下:
pip install pydub訓練分類模型
把音頻轉換成訓練數據最重要的是使用了librosa,使用librosa可以很方便得到音頻的梅爾頻譜(Mel Spectrogram),使用的API為 librosa.feature.melspectrogram(),輸出的是numpy值,可以直接用tensorflow訓練和預測。關于梅爾頻譜具體信息讀者可以自行了解,跟梅爾頻譜同樣很重要的梅爾倒譜(MFCCs)更多用于語音識別中,對應的API為 librosa.feature.mfcc()。同樣以下的代碼,就可以獲取到音頻的梅爾頻譜。
wav, sr = librosa.load(data_path, sr=16000) features = librosa.feature.melspectrogram(y=wav, sr=sr, n_fft=400, n_mels=80, hop_length=160, win_length=400) features = librosa.power_to_db(features, ref=1.0, amin=1e-10, top_db=None)生成數據列表
生成數據列表,用于下一步的讀取需要,audio_path為音頻文件路徑,用戶需要提前把音頻數據集存放在dataset/audio目錄下,每個文件夾存放一個類別的音頻數據,每條音頻數據長度在3秒以上,如 dataset/audio/鳥叫聲/······。audio是數據列表存放的位置,生成的數據類別的格式為 音頻路徑\t音頻對應的類別標簽,音頻路徑和標簽用制表符 \t分開。讀者也可以根據自己存放數據的方式修改以下函數。
Urbansound8K 是目前應用較為廣泛的用于自動城市環境聲分類研究的公共數據集,包含10個分類:空調聲、汽車鳴笛聲、兒童玩耍聲、狗叫聲、鉆孔聲、引擎空轉聲、槍聲、手提鉆、警笛聲和街道音樂聲。數據集下載地址:https://zenodo.org/record/1203745/files/UrbanSound8K.tar.gz。以下是針對Urbansound8K生成數據列表的函數。如果讀者想使用該數據集,請下載并解壓到 dataset目錄下,把生成數據列表代碼改為以下代碼。
# 生成數據列表 def get_data_list(audio_path, list_path):sound_sum = 0audios = os.listdir(audio_path)f_train = open(os.path.join(list_path, 'train_list.txt'), 'w')f_test = open(os.path.join(list_path, 'test_list.txt'), 'w')for i in range(len(audios)):sounds = os.listdir(os.path.join(audio_path, audios[i]))for sound in sounds:if '.wav' not in sound:continuesound_path = os.path.join(audio_path, audios[i], sound)t = librosa.get_duration(filename=sound_path)# 過濾小于2.1秒的音頻if t >= 2.1:if sound_sum % 100 == 0:f_test.write('%s\t%d\n' % (sound_path, i))else:f_train.write('%s\t%d\n' % (sound_path, i))sound_sum += 1print("Audio:%d/%d" % (i + 1, len(audios)))f_test.close()f_train.close()if __name__ == '__main__':get_data_list('dataset/UrbanSound8K/audio', 'dataset')創建 reader.py用于在訓練時讀取數據。編寫一個 CustomDataset類,用讀取上一步生成的數據列表。
class CustomDataset(Dataset):def __init__(self, data_list_path, mode='train', sr=16000, chunk_duration=3):super(CustomDataset, self).__init__()# 當預測時不需要獲取數據if data_list_path is not None:with open(data_list_path, 'r') as f:self.lines = f.readlines()self.mode = modeself.sr = srself.chunk_duration = chunk_durationdef __getitem__(self, idx):try:audio_path, label = self.lines[idx].replace('\n', '').split('\t')# 加載并預處理音頻features = load_audio(audio_path, mode=self.mode, sr=self.sr, chunk_duration=self.chunk_duration)return features, np.array(int(label), dtype=np.int64)except Exception as ex:print(f"[{datetime.now()}] 數據: {self.lines[idx]} 出錯,錯誤信息: {ex}", file=sys.stderr)rnd_idx = np.random.randint(self.__len__())return self.__getitem__(rnd_idx)def __len__(self):return len(self.lines)下面是在訓練時或者測試時讀取音頻數據,訓練時對轉換的梅爾頻譜數據隨機裁剪,如果是測試,就取前面的,最好要執行歸一化。
def load_audio(audio_path, mode='train', sr=16000, chunk_duration=3):# 讀取音頻數據wav, sr_ret = librosa.load(audio_path, sr=sr)if mode == 'train':# 隨機裁剪num_wav_samples = wav.shape[0]# 數據太短不利于訓練if num_wav_samples < sr:raise Exception(f'音頻長度不能小于1s,實際長度為:{(num_wav_samples / sr):.2f}s')num_chunk_samples = int(chunk_duration * sr)if num_wav_samples > num_chunk_samples + 1:start = random.randint(0, num_wav_samples - num_chunk_samples - 1)stop = start + num_chunk_sampleswav = wav[start:stop]# 對每次都滿長度的再次裁剪if random.random() > 0.5:wav[:random.randint(1, sr // 2)] = 0wav = wav[:-random.randint(1, sr // 2)]elif mode == 'eval':# 為避免顯存溢出,只裁剪指定長度num_wav_samples = wav.shape[0]num_chunk_samples = int(chunk_duration * sr)if num_wav_samples > num_chunk_samples + 1:wav = wav[:num_chunk_samples]features = librosa.feature.melspectrogram(y=wav, sr=sr, n_fft=400, n_mels=80, hop_length=160, win_length=400)features = librosa.power_to_db(features, ref=1.0, amin=1e-10, top_db=None)# 歸一化mean = np.mean(features, 0, keepdims=True)std = np.std(features, 0, keepdims=True)features = (features - mean) / (std + 1e-5)features = features.astype('float32')return features訓練
接著就可以開始訓練模型了,創建 train.py。通過把音頻數據轉換成梅爾頻譜。然后定義優化方法和獲取訓練和測試數據。要注意 args.num_classes參數的值,這個是類別的數量,要根據你數據集中的分類數量來修改。
def train(args):# 設置支持多卡訓練if len(args.gpus.split(',')) > 1:dist.init_parallel_env()# 獲取數據train_dataset = CustomDataset(args.train_list_path, mode='train')# 設置支持多卡訓練if len(args.gpus.split(',')) > 1:train_batch_sampler = paddle.io.DistributedBatchSampler(train_dataset, batch_size=args.batch_size, shuffle=True)else:train_batch_sampler = paddle.io.BatchSampler(train_dataset, batch_size=args.batch_size, shuffle=True)train_loader = DataLoader(dataset=train_dataset,batch_sampler=train_batch_sampler,collate_fn=collate_fn,num_workers=args.num_workers)# 測試數據test_dataset = CustomDataset(args.test_list_path, mode='eval')test_loader = DataLoader(dataset=test_dataset,batch_size=args.batch_size,collate_fn=collate_fn,num_workers=args.num_workers)# 獲取分類標簽with open(args.label_list_path, 'r', encoding='utf-8') as f:lines = f.readlines()class_labels = [l.replace('\n', '') for l in lines]# 獲取模型model = EcapaTdnn(num_classes=args.num_classes)if dist.get_rank() == 0:paddle.summary(model, input_size=(1, 80, 98))# 設置支持多卡訓練if len(args.gpus.split(',')) > 1:model = paddle.DataParallel(model)# 學習率衰減scheduler = paddle.optimizer.lr.StepDecay(learning_rate=args.learning_rate, step_size=10, gamma=0.8, verbose=True)# 設置優化方法optimizer = paddle.optimizer.Adam(parameters=model.parameters(),learning_rate=scheduler,weight_decay=paddle.regularizer.L2Decay(5e-4))# 恢復訓練last_epoch = 0if args.resume is not None:model.set_state_dict(paddle.load(os.path.join(args.resume, 'model.pdparams')))optimizer_state = paddle.load(os.path.join(args.resume, 'optimizer.pdopt'))optimizer.set_state_dict(optimizer_state)# 獲取預訓練的epoch數last_epoch = optimizer_state['LR_Scheduler']['last_epoch']print(f'成功加載第 {last_epoch} 輪的模型參數和優化方法參數')# 獲取損失函數loss = paddle.nn.CrossEntropyLoss()最后執行訓練,每100個batch打印一次訓練日志,訓練一輪之后執行測試和保存模型,在測試時,把每個batch的輸出都統計,最后求平均值。保存的模型為預測模型,方便之后的預測使用。
# 獲取損失函數loss = paddle.nn.CrossEntropyLoss()# 開始訓練for epoch in range(last_epoch, args.num_epoch):loss_sum = []accuracies = []for batch_id, (spec_mag, label) in enumerate(train_loader()):output = model(spec_mag)# 計算損失值los = loss(output, label)los.backward()optimizer.step()optimizer.clear_grad()# 計算準確率label = paddle.reshape(label, shape=(-1, 1))acc = accuracy(input=paddle.nn.functional.softmax(output), label=label)accuracies.append(acc.numpy()[0])loss_sum.append(los)# 多卡訓練只使用一個進程打印if batch_id % 100 == 0 and dist.get_rank() == 0:print(f'[{datetime.now()}] Train epoch [{epoch}/{args.num_epoch}], batch: {batch_id}/{len(train_loader)}, 'f'lr: {scheduler.get_lr():.8f}, loss: {sum(loss_sum) / len(loss_sum):.8f}, 'f'accuracy: {sum(accuracies) / len(accuracies):.8f}')每輪訓練結束之后都會執行一次評估,和保存模型。評估會出來輸出準確率,還保存了混合矩陣圖片,如下。
預測
在訓練結束之后,我們得到了一個模型參數文件,我們使用這個模型預測音頻,在執行預測之前,需要把音頻轉換為梅爾頻譜數據,最后輸出的結果即為預測概率最大的標簽。
parser = argparse.ArgumentParser(description=__doc__) add_arg = functools.partial(add_arguments, argparser=parser) add_arg('audio_path', str, 'dataset/UrbanSound8K/audio/fold5/156634-5-2-5.wav', '音頻路徑') add_arg('num_classes', int, 10, '分類的類別數量') add_arg('label_list_path', str, 'dataset/label_list.txt', '標簽列表路徑') add_arg('model_path', str, 'models/model.pdparams', '模型保存的路徑') args = parser.parse_args()# 獲取分類標簽 with open(args.label_list_path, 'r', encoding='utf-8') as f:lines = f.readlines() class_labels = [l.replace('\n', '') for l in lines] # 獲取模型 model = EcapaTdnn(num_classes=args.num_classes) model.set_state_dict(paddle.load(args.model_path)) model.eval()def infer():data = load_audio(args.audio_path, mode='infer')data = data[np.newaxis, :]data = paddle.to_tensor(data, dtype='float32')# 執行預測output = model(data)result = paddle.nn.functional.softmax(output).numpy()# 顯示圖片并輸出結果最大的labellab = np.argsort(result)[0][-1]print(f'音頻:{args.audio_path} 的預測結果標簽為:{class_labels[lab]}')if __name__ == '__main__':infer()其他
為了方便讀取錄制數據和制作數據集,這里提供了兩個程序,首先是 record_audio.py,這個用于錄制音頻,錄制的音頻幀率為44100,通道為1,16bit。
import pyaudio import wave import uuid from tqdm import tqdm import oss = input('請輸入你計劃錄音多少秒:')CHUNK = 1024 FORMAT = pyaudio.paInt16 CHANNELS = 1 RATE = 44100 RECORD_SECONDS = int(s) WAVE_OUTPUT_FILENAME = "save_audio/%s.wav" % str(uuid.uuid1()).replace('-', '')p = pyaudio.PyAudio()stream = p.open(format=FORMAT,channels=CHANNELS,rate=RATE,input=True,frames_per_buffer=CHUNK)print("開始錄音, 請說話......")frames = []for i in tqdm(range(0, int(RATE / CHUNK * RECORD_SECONDS))):data = stream.read(CHUNK)frames.append(data)print("錄音已結束!")stream.stop_stream() stream.close() p.terminate()if not os.path.exists('save_audio'):os.makedirs('save_audio')wf = wave.open(WAVE_OUTPUT_FILENAME, 'wb') wf.setnchannels(CHANNELS) wf.setsampwidth(p.get_sample_size(FORMAT)) wf.setframerate(RATE) wf.writeframes(b''.join(frames)) wf.close()print('文件保存在:%s' % WAVE_OUTPUT_FILENAME) os.system('pause')創建 crop_audio.py,在訓練是只是裁剪前面的2.97秒的音頻,所以我們要把錄制的硬盤安裝每3秒裁剪一段,把裁剪后音頻存放在音頻名稱命名的文件夾中。最后把這些文件按照訓練數據的要求創建數據列表和訓練數據。
import os import uuid import wave from pydub import AudioSegment# 按秒截取音頻 def get_part_wav(sound, start_time, end_time, part_wav_path):save_path = os.path.dirname(part_wav_path)if not os.path.exists(save_path):os.makedirs(save_path)start_time = int(start_time) * 1000end_time = int(end_time) * 1000word = sound[start_time:end_time]word.export(part_wav_path, format="wav")def crop_wav(path, crop_len):for src_wav_path in os.listdir(path):wave_path = os.path.join(path, src_wav_path)print(wave_path[-4:])if wave_path[-4:] != '.wav':continuefile = wave.open(wave_path)# 幀總數a = file.getparams().nframes# 采樣頻率f = file.getparams().framerate# 獲取音頻時間長度t = int(a / f)print('總時長為 %d s' % t)# 讀取語音sound = AudioSegment.from_wav(wave_path)for start_time in range(0, t, crop_len):save_path = os.path.join(path, os.path.basename(wave_path)[:-4], str(uuid.uuid1()) + '.wav')get_part_wav(sound, start_time, start_time + crop_len, save_path)if __name__ == '__main__':crop_len = 6crop_wav('save_audio', crop_len)創建 infer_record.py,這個程序是用來不斷進行錄音識別,我們可以大致理解為這個程序在實時錄音識別。通過這個應該我們可以做一些比較有趣的事情,比如把麥克風放在小鳥經常來的地方,通過實時錄音識別,一旦識別到有鳥叫的聲音,如果你的數據集足夠強大,有每種鳥叫的聲音數據集,這樣你還能準確識別是那種鳥叫。如果識別到目標鳥類,就啟動程序,例如拍照等等。
parser = argparse.ArgumentParser(description=__doc__) add_arg = functools.partial(add_arguments, argparser=parser) add_arg('num_classes', int, 10, '分類的類別數量') add_arg('label_list_path', str, 'dataset/label_list.txt', '標簽列表路徑') add_arg('model_path', str, 'models/model.pdparams', '模型保存的路徑') args = parser.parse_args()# 獲取分類標簽 with open(args.label_list_path, 'r', encoding='utf-8') as f:lines = f.readlines() class_labels = [l.replace('\n', '') for l in lines] # 獲取模型 model = EcapaTdnn(num_classes=args.num_classes) model.set_state_dict(paddle.load(args.model_path)) model.eval()# 錄音參數 CHUNK = 1024 FORMAT = pyaudio.paInt16 CHANNELS = 1 RATE = 16000 RECORD_SECONDS = 3 WAVE_OUTPUT_FILENAME = "infer_audio.wav"# 打開錄音 p = pyaudio.PyAudio() stream = p.open(format=FORMAT,channels=CHANNELS,rate=RATE,input=True,frames_per_buffer=CHUNK)# 獲取錄音數據 def record_audio():print("開始錄音......")frames = []for i in range(0, int(RATE / CHUNK * RECORD_SECONDS)):data = stream.read(CHUNK)frames.append(data)print("錄音已結束!")wf = wave.open(WAVE_OUTPUT_FILENAME, 'wb')wf.setnchannels(CHANNELS)wf.setsampwidth(p.get_sample_size(FORMAT))wf.setframerate(RATE)wf.writeframes(b''.join(frames))wf.close()return WAVE_OUTPUT_FILENAME# 預測 def infer(audio_path):data = load_audio(audio_path, mode='infer')data = data[np.newaxis, :]data = paddle.to_tensor(data, dtype='float32')# 執行預測output = model(data)result = paddle.nn.functional.softmax(output).numpy()# 顯示圖片并輸出結果最大的labellab = np.argsort(result)[0][-1]return class_labels[lab]if __name__ == '__main__':try:while True:# 加載數據audio_path = record_audio()# 獲取預測結果label = infer(audio_path)print(f'預測的標簽為:{label}')except Exception as e:print(e)stream.stop_stream()stream.close()p.terminate()總結
以上是生活随笔為你收集整理的基于PaddlePaddle实现声音分类的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Python基础教程-菜鸟教程学习笔记1
- 下一篇: Oracle Database的安装环境