大话ion系列(三)
點擊上方“LiveVideoStack”關注我們
作者 | 王朋闖
本文為王朋闖老師創作的系列ion文章,LiveVideoStack已獲得授權發布,未來將持續更新。
大話ion系列(一)
大話ion系列(二)
五、offer與answer流程
1.前言
之前的文章已經介紹了前兩次重協商:
客戶端sdk的pub的dc已經打通,此時使用dc控制simulcast和監聽audiolevel speaker。
客戶端sdk的sub訂閱到了房間內的流。
接下來,SDK推流是第三次協商了。
2.offer流程
當點擊ion-sfu的demo里“publish”按鈕的時候,就會觸發ion-sdk-js的操作:
把音視頻track增加到pub的pc,此時會觸發onNegotiationNeeded。
首先來看一下ion-sdk-js的代碼:
this.transports[Role.pub].pc.onnegotiationneeded = this.onNegotiationNeeded.bind(this);這里把onNegotiationNeeded綁定到了pc.onnegotiationneeded,意思是當推流增加track到pc時,就會觸發onNegotiationNeeded。
參考:
https://developer.mozilla.org/en-US/docs/Web/API/RTCPeerConnection/onnegotiationneeded
接下來看一下onNegotiationNeeded,是個標準的重協商流程。
private async onNegotiationNeeded() {if (!this.transports) {throw Error(ERR_NO_SESSION);}let offer: RTCSessionDescriptionInit | undefined;let answer: RTCSessionDescriptionInit | undefined;try {offer = await this.transports[Role.pub].pc.createOffer();await this.transports[Role.pub].pc.setLocalDescription(offer);answer = await this.signal.offer(offer);//在這里發送offer到SFUawait this.transports[Role.pub].pc.setRemoteDescription(answer);} catch (err) {/* tslint:disable-next-line:no-console */console.error(err);if (this.onerrnegotiate) this.onerrnegotiate(Role.pub, err, offer, answer);}}接下來看一下SFU里的處理:
func (p *JSONSignal) Handle(ctx context.Context, conn *jsonrpc2.Conn, req *jsonrpc2.Request) { .... case "offer":var negotiation Negotiationerr := json.Unmarshal(*req.Params, &negotiation)if err != nil {p.Logger.Error(err, "connect: error parsing offer")replyError(err)break}//調用peerLocal.Answer()answer, err := p.Answer(negotiation.Desc)if err != nil {replyError(err)break}// 發送answer_ = conn.Reply(ctx, req.ID, answer) func (p *PeerLocal) Answer(sdp webrtc.SessionDescription) (*webrtc.SessionDescription, error) { ....//這里調用了publisher.Answer()answer, err := p.publisher.Answer(sdp)if err != nil {return nil, fmt.Errorf("error creating answer: %v", err)}Logger.V(0).Info("PeerLocal send answer", "peer_id", p.id)return &answer, nil } // 這里可以看到Publisher.Answer就是標準的協商流程 // 在前邊的文章詳細介紹過什么叫協商和重協商,這里不在重復了 func (p *Publisher) Answer(offer webrtc.SessionDescription) (webrtc.SessionDescription, error) {if err := p.pc.SetRemoteDescription(offer); err != nil {return webrtc.SessionDescription{}, err}for _, c := range p.candidates {if err := p.pc.AddICECandidate(c); err != nil {Logger.Error(err, "Add publisher ice candidate to peer err", "peer_id", p.id)}}p.candidates = nilanswer, err := p.pc.CreateAnswer(nil)if err != nil {return webrtc.SessionDescription{}, err}if err := p.pc.SetLocalDescription(answer); err != nil {return webrtc.SessionDescription{}, err}return answer, nil }3.answer流程
func (p *JSONSignal) Handle(ctx context.Context, conn *jsonrpc2.Conn, req *jsonrpc2.Request) { ....case "answer":var negotiation Negotiationerr := json.Unmarshal(*req.Params, &negotiation)if err != nil {p.Logger.Error(err, "connect: error parsing offer")replyError(err)break}// peerLocal.SetRemoteDescriptionerr = p.SetRemoteDescription(negotiation.Desc)if err != nil {replyError(err)}func (p *PeerLocal) SetRemoteDescription(sdp webrtc.SessionDescription) error {if p.subscriber == nil {return ErrNoTransportEstablished}p.Lock()defer p.Unlock()Logger.V(0).Info("PeerLocal got answer", "peer_id", p.id)// 這里調用subscriber.SetRemoteDescriptionif err := p.subscriber.SetRemoteDescription(sdp); err != nil {return fmt.Errorf("setting remote description: %w", err)}p.remoteAnswerPending = falseif p.negotiationPending {//這里兩個標志位是為了防止重協商競爭沖突p.negotiationPending = falsep.subscriber.negotiate()}return nil }//這里僅僅是調用pc.SetRemoteDescription func (s *Subscriber) SetRemoteDescription(desc webrtc.SessionDescription) error {if err := s.pc.SetRemoteDescription(desc); err != nil {Logger.Error(err, "SetRemoteDescription error")return err} .....return nil }至此,第三次重協商完成,兩端交換完sdp,接下來ice打通后會推流過來。
4.總結
pub在第一次協商后,只打通了dc,此時使用dc控制simulcast和監聽audiolevel speaker,也可以定制自己的dc。
這樣的好處是靈活。
sub在第二次協商后,可以訂閱到房間內的其他人的流了。
pub在第三次協商時,是增加音視頻track后,然后走標準重協商流程,開始推流。
六、包的收發流程
1.前言
本文從ion-sfu中的demo點擊“publish”開始,講一下如何收包轉發。
前邊講到,點擊“publish”,會進行第三次重協商,協商完成,客戶端此時推流到SFU。
此時會觸發OnTrack,這里的OnTrack和標準webrtc接口是一樣的,會在流到達時自動觸發。
參考:
https://developer.mozilla.org/en-US/docs/Web/API/RTCPeerConnection/ontrack
2.收包流程
OnTrack收包流程的入口
首先貼一下老代碼,OnTrack是在NewPublisher里邊設置的回調。
func NewPublisher(idstring,sessionSession,cfg*WebRTCTransportConfig)(*Publisher,error){ 。。。//這里要注意cfg.Setting,里邊的bufferFactory已經設置好了為自定義的c.BufferFactory.GetOrNew//可以搜一下這個函數NewWebRTCTransportConfig,這一行“se.BufferFactory = c.BufferFactory.GetOrNew”api:=webrtc.NewAPI(webrtc.WithMediaEngine(me),webrtc.WithSettingEngine(cfg.Setting))pc,err:=api.NewPeerConnection(cfg.Configuration) 。。。p:=&Publisher{id: id,pc: pc,cfg: cfg,router: newRouter(id,session,cfg),session:session,} //===========OnTrack在這里pc.OnTrack(func(track*webrtc.TrackRemote,receiver*webrtc.RTPReceiver){ 。。。//這里AddReceiver會新建WebRTCReceiver,然后AddUpTrack//uptrack是收流的,downtrack是發流的r,pub:=p.router.AddReceiver(receiver,track)if pub{//這里會把流發布到房間內,其他peer會訂閱到p.session.Publish(p.router,r)p.mu.Lock()publisherTrack:=PublisherTrack{track,r,true}p.tracks=append(p.tracks,publisherTrack) 。。。p.mu.Unlock()if handler,ok:=p.onPublisherTrack.Load().(func(PublisherTrack));ok&&handler!=nil{//這里如果上層業務,通過OnPublisherTrack設置了回調,就會觸發//一般只有包導入的情況下,才會這樣用,比如業務不想加入房間就自動訂閱,想要按需訂閱handler(publisherTrack)}} else {p.mu.Lock()p.tracks=append(p.tracks,PublisherTrack{track,r,false})p.mu.Unlock()}})//===========自定義buffer
這里不得不介紹一下自定義buffer了,看懂了才知道包是從哪里來的。
Pion/webrtc支持自定義BufferFactory,設置好之后,pion/webrtc的組件會使用自定義buffer。
比如pion/srtp是實際收發srtp和srtcp包的類,它們也會使用自定義buffer。
首先來看一下ion-sfu是在哪里設置自定義buffer的:
func NewWebRTCTransportConfig(cConfig)WebRTCTransportConfig{//這個SettingEngine是pion里很重要的設置類,可以控制pion/webrtc很多行為和參數,比如ice-lite等se:=webrtc.SettingEngine{}se.DisableMediaEngineCopy(true)....//這里把自定義的BufferFactory給配置進去了//意思是pion/srtp會使用這個buffer來傳包se.BufferFactory=c.BufferFactory.GetOrNew }srtp和srtcp流向是這樣的:
客戶端---srtp--->srtp.ReadStreamSRTP------->SFU 客戶端<---srtcp---srtp.ReadStreamSRTCP<------SFU當包到達pion/srtp時,就會觸發ReadStreamSRTP.init函數和ReadStreamSRTCP.init函數。
ReadStreamSRTP.init調用自定義的BufferFactory.GetOrNew函數了,new了一個buffer。
ReadStreamSRTCP.init調用自定義的BufferFactory.GetOrNew函數,new一個rtcpReader。
之后收發rtp和rtcp包,就會流經這個buffer和rtcpReader:
https://github.com/pion/srtp/blob/3c34651fa0c6de900bdc91062e7ccb5992409643/stream_srtp.go#L53
為什么這么搞呢?
仔細想想,如果控制了rtp和rtcp的buffer,是不是計算twcc、nack、stats等就很方便了?在buffer寫入包的同時,就可以通過設置的回調函數搞各種復雜計算。
router.AddReceiver
接下來可以看到buffer的各種回調。
func(r*router)AddReceiver(receiver*webrtc.RTPReceiver,track*webrtc.TrackRemote)(Receiver,bool){r.Lock()deferr.Unlock()publish:=falsetrackID:=track.ID()//這里獲取了之前init函數中,new出來的buffer和rtcpReader,開始搞事情buff,rtcpReader:=r.bufferFactory.GetBufferPair(uint32(track.SSRC()))//設置rtcp的回調,比如nack、twcc、rrbuff.OnFeedback(func(fb[]rtcp.Packet){r.rtcpCh<-fb})if track.Kind()==webrtc.RTPCodecTypeAudio{streamID:=track.StreamID()//如果是音頻track,設置OnAudioLevel回調buff.OnAudioLevel(func(leveluint8){r.session.AudioObserver().observe(streamID,level)})r.session.AudioObserver().addStream(streamID)}else if track.Kind()==webrtc.RTPCodecTypeVideo{if r.twcc==nil{//如果是視頻track,創建twcc計算器,并設置回調,當計算器生成twcc包就會回調r.twcc=twcc.NewTransportWideCCResponder(uint32(track.SSRC()))r.twcc.OnFeedback(func(prtcp.RawPacket){r.rtcpCh<-[]rtcp.Packet{&p}})}//設置buffer的twcc回調,buffer收到包后調用,塞入twcc計算器//twcc計算生成rtcp包,再回調OnFeedback發送給客戶端buff.OnTransportWideCC(func(snuint16,timeNSint64,markerbool){r.twcc.Push(sn,timeNS,marker)})}if r.config.WithStats{r.stats[uint32(track.SSRC())]=stats.NewStream(buff)}//設置rtcpReader.OnPacketrtcpReader.OnPacket(func(bytes[]byte){//收到SDES、SR包做些處理})recv,ok:=r.receivers[trackID]if!ok{//創建WebRTCReceiver并設置回調recv=NewWebRTCReceiver(receiver,track,r.id)r.receivers[trackID]=recvrecv.SetRTCPCh(r.rtcpCh)recv.OnCloseHandler(func(){。。。。})publish=true}//把track buffer塞入recvrecv.AddUpTrack(track,buff,r.config.Simulcast.BestQualityFirst)//初始化buffbuff.Bind(receiver.GetParameters(),buffer.Options{MaxBitRate:r.config.MaxBandwidth,})。。。return recv,publish }這里很重要,WebRTCReceiver是真實負責收發包的,可以看到AddUpTrack已經把buffer塞進去了。
接下來看一下AddUpTrack是如何工作的:
func(w*WebRTCReceiver)AddUpTrack(track*webrtc.TrackRemote,buff*buffer.Buffer,bestQualityFirstbool){if w.closed.get(){return}//根據RID來區分layervarlayerintswitchtrack.RID(){//如果沒開simulcast,為""casefullResolution:layer=2casehalfResolution:layer=1default:layer=0//如果沒開simulcast,為0}w.Lock()//設置空域層layer的trackw.upTracks[layer]=track//設置空域層layer的buffw.buffers[layer]=buffw.available[layer].set(true)//設置空域層layer的downtrack,這里的[]*DownTrack數組,訂閱該layer的downtrack存在這里w.downTracks[layer].Store(make([]*DownTrack,0,10))w.pendingTracks[layer]=make([]*DownTrack,0,10)w.Unlock()//閉包函數,按最佳質量訂閱,切到f層subBestQuality:=func(targetLayerint){for l:=0;l<targetLayer;l++{dts:=w.downTracks[l].Load()if dts==nil{continue}for_,dt:=rangedts.([]*DownTrack){_=dt.SwitchSpatialLayer(int32(targetLayer),false)}}}//閉包函數,按最差質量訂閱,切到q層subLowestQuality:=func(targetLayerint){for l:=2;l!=targetLayer;l--{dts:=w.downTracks[l].Load()if dts==nil{continue}for_,dt:=rangedts.([]*DownTrack){_=dt.SwitchSpatialLayer(int32(targetLayer),false)}}}//是否開啟大小流if w.isSimulcast{//如果配置最佳質量,則等到f層到來時,訂閱它if bestQualityFirst&&(!w.available[2].get()||layer==2){subBestQuality(layer)//如果配置最差質量,則等到q層到來時,訂閱它}else if!bestQualityFirst&&(!w.available[0].get()||layer==0){subLowestQuality(layer)}}//啟動讀寫流程go w.writeRTP(layer) }真正的收發包流程來了:
func(w*WebRTCReceiver)writeRTP(layerint){defer func(){//這里設置自動清理函數w.closeOnce.Do(func(){w.closed.set(true)w.closeTracks()})}()//創建一個PLI包,后邊要用pli:=[]rtcp.Packet{&rtcp.PictureLossIndication{SenderSSRC:rand.Uint32(),MediaSSRC:w.SSRC(layer)},}for {//這里可以看到,真正讀包是從buffer里讀出來的,正是前邊講到的自定義bufferpkt,err:=w.buffers[layer].ReadExtended()if err==io.EOF{return}//如果開啟大小流if w.isSimulcast{ 。。。//這里跳過,以后再講}for_,dt:=rangew.downTracks[layer].Load().([]*DownTrack){//下行track寫入rtp包,這樣訂閱者可以收到流了if err=dt.WriteRTP(pkt,layer);err!=nil{if?err==io.EOF&&err==io.ErrClosedPipe{w.Lock()w.deleteDownTrack(layer,dt.id)w.Unlock()}log.Error().Err(err).Str("id",dt.id).Msg("Error writing to down track")}}}}3.發包流程
SessionLocal.Publish
func(s*SessionLocal)Publish(routerRouter,rReceiver){for_,p:=ranges.Peers(){// Don't sub to selfif router.ID()==p.ID()||p.Subscriber()==nil{continue}//表示根據r的信息創建downtrack,并增加到p.Subscriber()和r中if err:=router.AddDownTracks(p.Subscriber(),r);err!=nil{Logger.Error(err,"Error subscribing transport to Router")continue}} }router.AddDownTracks
func(r*router)AddDownTracks(s*Subscriber,recvReceiver)error{ 。。。 //如果recv不為空,表示根據recv的信息創建downtrack,并增加到s和recv中if recv!=nil{if_,err:=r.AddDownTrack(s,recv);err!=nil{return err}s.negotiate()return nil} //如果recv為空,表示遍歷房間中所有的receivers,并增加到s和recv中if len(r.receivers)>0{for_,rcv:=ranger.receivers{if_,err:=r.AddDownTrack(s,rcv);err!=nil{return err}}s.negotiate()}return nil }router.AddDownTrack
根據recv的信息創建downtrack,并增加到sub和recv中。
func(r*router)AddDownTrack(sub*Subscriber,recvReceiver)(*DownTrack,error){for_,dt:=rangesub.GetDownTracks(recv.StreamID()){//避免重復添加if dt.ID()==recv.TrackID(){return dt,nil}}codec:=recv.Codec()if err:=sub.me.RegisterCodec(codec,recv.Kind());err!=nil{return nil,err}//創建downtrack,downtrack用來給客戶端下發流downTrack,err:=NewDownTrack(webrtc.RTPCodecCapability{MimeType: codec.MimeType,ClockRate: codec.ClockRate,Channels: codec.Channels,SDPFmtpLine: codec.SDPFmtpLine,RTCPFeedback:[]webrtc.RTCPFeedback{{"goog-remb",""},{"nack",""},{"nack","pli"}},},recv,r.bufferFactory,sub.id,r.config.MaxPacketTrack)if err!=nil{return nil,err}//把downtrack增加到pc中if downTrack.transceiver,err=sub.pc.AddTransceiverFromTrack(downTrack,webrtc.RTPTransceiverInit{Direction:webrtc.RTPTransceiverDirectionSendonly,});err!=nil{return nil,err}// 設置關閉回調,關閉時pc自動刪除trackdownTrack.OnCloseHandler(func(){if sub.pc.ConnectionState()!=webrtc.PeerConnectionStateClosed{if err:=sub.pc.RemoveTrack(downTrack.transceiver.Sender());err!=nil{if err==webrtc.ErrConnectionClosed{return}Logger.Error(err,"Error closing down track")}else{//如果刪除成功,再從sub中刪除,然后重協商sub.RemoveDownTrack(recv.StreamID(),downTrack)sub.negotiate()}}})//設置OnBind回調,DownTrack.Bind()里會調用這個;PC協商完成時,DownTrack.Bind()會觸發downTrack.OnBind(func(){gosub.sendStreamDownTracksReports(recv.StreamID())})//增加downTrack到sub中,sub只是用來管理downtracks和生成SenderReport等sub.AddDownTrack(recv.StreamID(),downTrack)//增加downTrack到WebRTCReceiver中,實際收發包是WebRTCReceiver來控制,在writeRTP中recv.AddDownTrack(downTrack,r.config.Simulcast.BestQualityFirst)returndownTrack,nil }這樣下行track也增加好了,之前的writeRTP可以正常工作了。
4.總結
收發包邏輯打通步驟:
SDK推流---->OnTrack---->router.AddReceiver(設置Buffer和上行Track)------>SessionLocal.Publish(設置下行Track)---->收發包邏輯打通
收發包流程圖簡單總結:
srtp.write--->buffer.write--->buffer.ReadExtended--->downtrack.writeRTP收包流程堆棧:
github.com/pion/ion-sfu/pkg/buffer.(*Buffer).Write (/Volumes/vm/workspace/go/src/github.com/pion/ion-sfu/pkg/buffer/buffer.go:187) github.com/pion/srtp/v2.(*ReadStreamSRTP).write (/Volumes/vm/workspace/go/pkg/mod/github.com/pion/srtp/v2@v2.0.5/stream_srtp.go:64) github.com/pion/srtp/v2.(*SessionSRTP).decrypt?(/Volumes/vm/workspace/go/pkg/mod/github.com/pion/srtp/v2@v2.0.5/session_srtp.go:166)發包流程堆棧:
github.com/pion/ion-sfu/pkg/buffer.(*Buffer).ReadExtended (/Volumes/vm/workspace/go/src/github.com/pion/ion-sfu/pkg/buffer/buffer.go:236) github.com/pion/ion-sfu/pkg/sfu.(*WebRTCReceiver).writeRTP?(/Volumes/vm/workspace/go/src/github.com/pion/ion-sfu/pkg/sfu/receiver.go:345)作者簡介:
王朋闖:前百度RTN資深工程師,前金山云RTC技術專家,前VIPKID流媒體架構師,ION開源項目發起人。
特別說明:
本文發布于知乎,已獲得作者授權轉載。
掃描圖中二維碼或點擊閱讀原文
了解大會更多信息
喜歡我們的內容就點個“在看”吧!
總結
以上是生活随笔為你收集整理的大话ion系列(三)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 【今晚8点半】:对话袁家军——成都的多媒
- 下一篇: 为什么视频压缩如此重要