基于nao机器人实现语音对话(智能版本)
生活随笔
收集整理的這篇文章主要介紹了
基于nao机器人实现语音对话(智能版本)
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
nao機器人實現語音對話
1、語音獲取
nao耳麥有一個功能,它可以通過聲音大小判斷能力值,也就是聲音越大能量越大。所以我們此次項目主要運用的就是nao 的這個功能,來展開實現的。下面是流程圖。
- 功能流程圖
我們可以看見上面的流程圖,從錄音開始到錄音結束邏輯還是比較復雜的,而且還有一些我沒畫出來,這只是大概。 - 錄音的代碼
nao一共有四個聲道,下面的代碼是獲取聲音能量值
- 獲取能量值
上面代碼就是通過聲音能量值來判斷是否有人說話,從而判斷是否錄音的。里面邏輯需要大家自己看看,我說不太清楚。。。
2、錄音翻譯成文本
得到錄音需要用百度語音合成API將音頻轉換成文字。實現工程如下。
大家先去申請語音識別的API有了API KEY 和API 密碼之后才行。
- 代碼
3、將文本傳給華為云
這里主要是將問題傳給華為云知識庫,看是否能找到匹配的問題,比如我們做的是農業相關的,我就在華為云知識庫中添加農業相關的知識。
- 華為云機器人
- 代碼
點擊進來我們就可以看見知識庫了??梢宰约禾砑印R部梢蕴砑蛹寄?#xff0c;這個就要自己對華為云機器人熟悉了,我就不夠多的闡述。
4、傳給圖靈機器人
如果華為云知識庫沒有的問題就可以傳給圖靈機器人人了,因為圖靈機器人閑聊比華為云機器人好一些。
創建之后也可以添加問題,不過沒有華為云的好。
- 代碼
然后就可以語音把傳回來的就可以拿回來語音播報了。
5、源碼
# -*- coding:utf-8 -*- #!/usr/bin/env python import argparse from naoqi import ALProxy import wave import json import sys import os import paho.mqtt.client as mqtt import time import requests import re import json from time import sleep import random import json import sys import qi import time import tempfile import requestsfrom scipy.io import wavfile tts = audio = record = aup = None record_path = '/home/nao/record.wav' from aip import AipSpeech from urllib2 import urlopen,Request from urllib2 import URLError from urllib import urlencode reload(sys) sys.setdefaultencoding('utf-8') APP_ID = '21715692' API_KEY = 'O0gzDUHKkciBa60VddBgzuO1' SECRET_KEY = 'Psji0dC90D1OehYh63ZaQuc7UPA8soxb' username = 'h_y8689' user_demain_id = '0a37c79c8300f3840f9cc0137d392600' project_name = 'cn-north-4' project_domain_id = '0a37c81fbe00f38b2f0ac0135b8e3f93' password = 'hjy123456789' client = AipSpeech(APP_ID, API_KEY, SECRET_KEY) sys.setdefaultencoding('utf-8') global flag_two flag_two = 0 sys.path.append(os.path.abspath(os.path.dirname(__file__) + '/' + '..')) sys.path.append("..") TASK_TOPIC = 'test' client_id = time.strftime('%Y%m%d%H%M%S',time.localtime(time.time())) client = mqtt.Client(client_id, transport='tcp') client.connect("59.110.42.24", 1883, 60) client.loop_start()def clicent_main(message: str):time_now = time.strftime('%Y-%m-%d %H-%M-%S', time.localtime(time.time()))payload = {"msg": "%s" % message, "data": "%s" % time_now}# publish(主題:Topic; 消息內容)client.publish(TASK_TOPIC, json.dumps(payload, ensure_ascii=False))print("Successful send message!")return Trueclass Audio:def __init__(self, audio_recorder, audio_device, answer_nao):self.audio_recorder = audio_recorderself.audio_device = audio_deviceself.answer_nao = answer_naoself.data_result = Nonedef recorder(self):self.audio_recorder.stopMicrophonesRecording()time.sleep(0.2)energy = self.energy()print(energy['left'])self.audio_recorder.startMicrophonesRecording(record_path, "wav", 16000, (0,0,1,0)) print("record begin")wait = 0global flagflag = 0global flag_oneflag_one = 0while 1:energy = self.energy()time.sleep(0.1) if energy['left'] < 400 and wait <5:print("no body:",energy['left'],float(wait))wait +=0.1elif energy['left'] >600:print("have people:",energy['left'],wait)wait = 4.8continueelif wait >= 5:print("record over ")self.audio_recorder.stopMicrophonesRecording()breakelse:continueif round(wait,1) == 4.0:print("int wait:",int(wait))self.answer_nao.say("你還有什么要說的嗎,沒有我要休眠咯")self.audio_recorder.stopMicrophonesRecording()time.sleep(1)self.audio_recorder.startMicrophonesRecording(record_path,"wav",16000,(0,0,1,0))wait = 2flag = 1while 1:energy = self.energy()time.sleep(0.1) if energy['left'] < 400 and wait <5:print("no body:",energy['left'],float(wait))wait +=0.1elif energy['left'] >600:print("have people:",energy['left'],wait)wait = 4.9continueelif wait >= 5:print("record over ")self.audio_recorder.stopMicrophonesRecording()breakelse:continueif round(wait,1)==4.6:self.answer_nao.say("期待與您再次相遇")self.audio_recorder.stopMicrophonesRecording()flag_two = 2returnmsg = listen()msg = str(msg)print(msg)time.sleep(1)if "沒" in msg:self.answer_nao.say("拜拜")time.sleep(2)breakelif msg == "None":self.answer_nao.say("很高心跟您對話,期待再次與您相見")time.sleep(2)breakelif msg == "":self.answer_nao.say("期待再次和您相遇,再見")time.sleep(2)breakelse:flag_one = 2botMsg = turing.botInteraction(msg)test = str(botMsg)answerNao.say(test)time.sleep(0.5)audio.recorder()if flag == 1:breakdef energy(self):energy = dict() energy['left'] = self.audio_device.getLeftMicEnergy()energy['right'] = self.audio_device.getRightMicEnergy()energy['front'] = self.audio_device.getFrontMicEnergy()energy['rear'] = self.audio_device.getRearMicEnergy()return energydef answer(self, answer_data):self.answer_nao.setLanguage("Chinese")self.answer_nao.say(answer_data)def main(session):audioRecorder = session.service('ALAudioRecorder') audioDevice = session.service('ALAudioDevice')answerNao = session.service("ALTextToSpeech") audio = Audio(audioRecorder, audioDevice, answerNao)audio.recorder()try:passexcept Exception, errorMsg:print str(errorMsg)exit()class TuringChatMode(object):def __init__(self):self.turing_url = 'http://www.tuling123.com/openapi/api?'def botInteraction (self,text):url_data = dict(key = 'e7ea86036040426e8a9d123176bfe12f',info = text,userid = 'yjc',)self.request = Request(self.turing_url + urlencode(url_data))try:w_data = urlopen(self.request)except URLError:raise Exception("No internet connection available to transfer txt data")except:raise KeyError("Server wouldn't respond (invalid key or quota has been maxed out)")response_text = w_data.read().decode('utf-8')json_result = json.loads(response_text)return json_result['text']def main(robot_IP, robot_PORT=9559):global tts, audio, record, aup tts = ALProxy("ALTextToSpeech", robot_IP, robot_PORT)record = ALProxy("ALAudioRecorder", robot_IP, robot_PORT)aup = ALProxy("ALAudioPlayer", robot_IP, robot_PORT)print 'start recording...'record.startMicrophonesRecording(record_path, 'wav', 16000, (0,0,1,0))time.sleep(6)record.stopMicrophonesRecording()print 'record over' def huawei(msg):url1 = 'https://iam.cn-north-4.myhuaweicloud.com/v3/auth/tokens'header ={'Content-Type': 'application/json;charset=utf8' }data = { "auth": { "identity": { "methods": [ "password" ], "password": { "user": { "name": "h_y8689", "password": "hjy123456789","domain": { "name": "h_y8689" } } } }, "scope": { "project": { "name": "cn-north-4" } } } }global aa = 0res1 = requests.post(url1,data=json.dumps(data),headers =header)res1 = res1.headers['X-Subject-Token']#print("token:",res1[0:10])# url2 = 'https://cbs-ext.cn-north-4.myhuaweicloud.com/v1/0a37c81fbe00f38b2f0ac0135b8e3f93/qabots/5c71f659-3bc3-4f4b-8b1c-4125fcff7233/suggestions'Request_Header = {'Content-Type': 'application/json','X-Auth-Token' :res1}url2 = 'https://cbs-ext.cn-north-4.myhuaweicloud.com/v1/0a37c81fbe00f38b2f0ac0135b8e3f93/qabots/5c71f659-3bc3-4f4b-8b1c-4125fcff7233/sessions'res_2 = requests.post(url2, headers = Request_Header)#print(res_2.text)res_2 = json.loads(res_2.text)def ques(que,res_2):url4 = 'https://cbs-ext.cn-north-4.myhuaweicloud.com/v1/0a37c81fbe00f38b2f0ac0135b8e3f93/qabots/5c71f659-3bc3-4f4b-8b1c-4125fcff7233/sessions/{}'.format(res_2['session_id'])body = {'question' : que,'top' : '1','tag_ids' : 'nao','domain_ids' : 'nao','chat_enable': 'true'}res_4 = requests.post(url4, data=json.dumps(body), headers = Request_Header)res_4 = json.loads(res_4.text)return res_4que = msgres_4 = ques(que, res_2)print(res_4)if(res_4['reply_type'] == 0):#print(float(res_4['qabot_answers']['answers'][0]['score']))if(float(res_4['qabot_answers']['answers'][0]['score']) < 0.8):a = 1print(1)url5 = 'https://cbs-ext.cn-north-4.myhuaweicloud.com/v1/0a37c81fbe00f38b2f0ac0135b8e3f93/qabots/5c71f659-3bc3-4f4b-8b1c-4125fcff7233/sessions/{}'.format(res_2['session_id'])res_5 = requests.delete(url5, headers = Request_Header)return else:print("2")answerNao.say(res_4['qabot_answers']['answers'][0]['answer'])url5 = 'https://cbs-ext.cn-north-4.myhuaweicloud.com/v1/0a37c81fbe00f38b2f0ac0135b8e3f93/qabots/5c71f659-3bc3-4f4b-8b1c-4125fcff7233/sessions/{}'.format(res_2['session_id'])res_5 = requests.delete(url5, headers = Request_Header)returnelse:print(3)a = 1#answerNao.say(res_4['chat_answers']['answer'])url5 = 'https://cbs-ext.cn-north-4.myhuaweicloud.com/v1/0a37c81fbe00f38b2f0ac0135b8e3f93/qabots/5c71f659-3bc3-4f4b-8b1c-4125fcff7233/sessions/{}'.format(res_2['session_id'])res_5 = requests.delete(url5, headers = Request_Header)returndef listen():with open(record_path, 'rb') as fp:voices = fp.read()try:result = client.asr(voices, 'wav', 16000, {'dev_pid': 1537, })result_text = result["result"][0]result_text = result_text.replace(',','')result_text = result_text.replace('.','')return result_textexcept KeyError:print("faild")if __name__ == "__main__":parser = argparse.ArgumentParser()parser.add_argument("--ip", type=str, default="192.168.1.89", help="Robot ip address")parser.add_argument("--port", type=int, default=9559, help="Robot port number")args = parser.parse_args()session = qi.Session()try:session.connect("tcp://" + args.ip + ":" + str(args.port))except RuntimeError:print("Can't connect to Naoqi at ip "" + args.ip + "" on port " + str(args.port) +"Please check your script arguments. Run with -h option for help.")sys.exit(1)turing = TuringChatMode()audioRecorder = session.service('ALAudioRecorder') audioDevice = session.service('ALAudioDevice')answerNao = session.service("ALTextToSpeech") audio = Audio(audioRecorder, audioDevice, answerNao)answerNao.setLanguage("Chinese") print("enter xunhuan")while 1:energy = audio.energy()print(energy['left'])if energy['left']>2000:answerNao.say("你好,很高興認識你")time.sleep(0.5)while 1:audio.recorder()msg = listen()msg = str(msg)if (len(msg) <= 1):breakif "拜" in msg:answerNao.say("期待下次相遇")time.sleep(1)breakif "再見" in msg:answerNao.say("期待下次相遇")time.sleep(1)breakif “開燈”in msg:clicent_main("打開")if “開燈”in msg:clicent_main("關燈")if flag == 1:breakif flag_two == 2:breakhuawei(msg)print(a)if (a != 1):continuebotMsg = turing.botInteraction(msg)test = str(botMsg)answerNao.say(test)time.sleep(0.5)總結
以上是生活随笔為你收集整理的基于nao机器人实现语音对话(智能版本)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: C#系统命名空间
- 下一篇: 漏洞补丁:漏洞命名(CVE和CNNVD)