大话ion系列(四)
點擊上方“LiveVideoStack”關注我們
作者 | 王朋闖
本文為王朋闖老師創(chuàng)作的系列ion文章,LiveVideoStack已獲得授權發(fā)布,未來將持續(xù)更新。
大話ion系列(一)
大話ion系列(二)
大話ion系列(三)
七、Simulcast流程
1. Simulcast概念
先介紹WebRTC的一個概念——Simulcast(聯(lián)播,俗稱大小流):
上行一般是三路流,按分辨率和碼率,一般分為fhq(大中小)三層
下行可以分給不同的用戶不同的流,比如網(wǎng)不好時分發(fā)個小流q,網(wǎng)變好了再切回大流f
三層的streamId、trackId是一樣的,但是rid和ssrc是不同的,rid一般是f、h、q
對應的SDP部分
2.收發(fā)流程
看本章之前,最好看一下前一章,熟悉一下收發(fā)流程,本文只重點介紹其中的Simulcast部分。
收發(fā)包邏輯打通步驟:
SDK推流---->OnTrack---->router.AddReceiver(設置Buffer和上行Track)------>SessionLocal.Publish(設置下行Track)---->收發(fā)包邏輯打通
3.Simulcast上行流程
非Simulcast情況,OnTrack一般會觸發(fā)兩次:一個audioTrack+一個videoTrack。
Simulcast下,OnTrack一般會觸發(fā)四次:一個audioTrack+三個videoTrack(rid分別為fhq)。
這個流程會觸發(fā)四次:
OnTrack--->router.AddReceiver--->WebRTCReceiver.AddUpTrack三個videoTrack,共用同一個WebRTCReceiver。
type WebRTCReceiver struct { 。。。receiver *webrtc.RTPReceivercodec webrtc.RTPCodecParametersrtcpCh chan []rtcp.Packetbuffers [3]*buffer.Buffer//需要三個bufferupTracks [3]*webrtc.TrackRemote//三個TrackRemote 。。。pendingTracks [3][]*DownTrack//三個層,每層來訂閱的downtrack 。。。 }接下來看一下AddUpTrack是如何工作的:
func (w *WebRTCReceiver) AddUpTrack(track *webrtc.TrackRemote,buff *buffer.Buffer, bestQualityFirst bool) {if w.closed.get() {return}//根據(jù)RID來區(qū)分layervar layer intswitch track.RID() {//如果沒開simulcast,為""case fullResolution:layer = 2case halfResolution:layer = 1default:layer = 0//如果沒開simulcast,為0}w.Lock()//設置空域層layer的trackw.upTracks[layer] = track//設置空域層layer的buffw.buffers[layer] = buffw.available[layer].set(true)//設置空域層layer的downtrackw.downTracks[layer].Store(make([]*DownTrack,0, 10))w.pendingTracks[layer] = make([]*DownTrack,0, 10)w.Unlock()//閉包函數(shù),按最佳質量訂閱,切到f層subBestQuality := func(targetLayerint) {for l := 0; l <targetLayer; l++ {dts :=w.downTracks[l].Load()if dts == nil{continue}for _, dt :=range dts.([]*DownTrack) {_ = dt.SwitchSpatialLayer(int32(targetLayer), false)}}}//閉包函數(shù),按最差質量訂閱,切到q層subLowestQuality := func(targetLayerint) {for l := 2; l !=targetLayer; l-- {dts :=w.downTracks[l].Load()if dts == nil{continue}for _, dt :=range dts.([]*DownTrack) {_ = dt.SwitchSpatialLayer(int32(targetLayer), false)}}}//是否開啟大小流if w.isSimulcast {//如果配置最佳質量,則等到f層到來時,訂閱它if bestQualityFirst &&(!w.available[2].get() || layer == 2) {subBestQuality(layer)//如果配置最差質量,則等到q層到來時,訂閱它} else if!bestQualityFirst && (!w.available[0].get() ||layer == 0) {subLowestQuality(layer)}}//啟動讀寫流程go w.writeRTP(layer) }真正的收發(fā)包流程來了:
func (w *WebRTCReceiver) writeRTP(layer int) {defer func() {//這里設置自動清理函數(shù)w.closeOnce.Do(func() {w.closed.set(true)w.closeTracks()})}()//創(chuàng)建一個PLI包,后邊要用pli := []rtcp.Packet{&rtcp.PictureLossIndication{SenderSSRC:rand.Uint32(), MediaSSRC: w.SSRC(layer)},}for {//這里可以看到,真正讀包是從buffer里讀出來的,正是前邊講到的自定義bufferpkt, err :=w.buffers[layer].ReadExtended()if err ==io.EOF {return}//如果開啟大小流if w.isSimulcast {//一開始是pending狀態(tài)ifw.pending[layer].get() {//如果收到的包是關鍵幀ifpkt.KeyFrame {w.Lock()//如果有切換中的layer,那就切一下for idx,dt := range w.pendingTracks[layer] {w.deleteDownTrack(dt.CurrentSpatialLayer(), dt.peerID)w.storeDownTrack(layer, dt)dt.SwitchSpatialLayerDone(int32(layer))w.pendingTracks[layer][idx] = nil}w.pendingTracks[layer] = w.pendingTracks[layer][:0]w.pending[layer].set(false)w.Unlock()} else {//如果是非關鍵字,說明需要發(fā)送PLIw.SendRTCP(pli)}}}//這里是不是有疑問,[]*downTracks是SessionLocal.Publish里塞過來的,后邊會介紹:)for _, dt := rangew.downTracks[layer].Load().([]*DownTrack){//下行track寫入rtp包if err = dt.WriteRTP(pkt, layer);err != nil {if err ==io.EOF && err == io.ErrClosedPipe {w.Lock()w.deleteDownTrack(layer, dt.id)w.Unlock()}log.Error().Err(err).Str("id", dt.id).Msg("Errorwriting to down track")}}}}至此一個簡單的Simulcast收發(fā)模型:
SFU--->WebRTCReceiver(audio).buffer[0].ReadExtended---->downTracks[0][0].WriteRTP->SDK| |....| |--->downTracks[0][N].WriteRTP||---->WebRTCReceiver(video).buffer[0].ReadExtended---->downTracks[0][0].WriteRTP| |....| |---->downTracks[0][N].WriteRTP||------------->buffer[1].ReadExtended---->downTracks[1][0].WriteRTP| |....| |----->downTracks[1][N].WriteRTP||------------->buffer[2].ReadExtended---->downTracks[2][0].WriteRTP|....|------>downTracks[2][N].WriteRTP上面省略了SDK--->ReadStreamSRTP.buffer.Write,這個buffer和WebRTCReceiver.buffer是同一個。
訂閱端SDK的切大小流操作,其實就是在0-2來回掛載downTrack而已。
4.Simulcast下行流程
讀者前邊的疑問,downTracks是哪里塞過來的?流程在這里:
OnTrack--->SessionLocal.Publish--->router.AddDownTracks--->router.AddDownTrack--->WebRTCReceiver.AddDownTrack--->WebRTCReceiver.storeDownTrack
pc.OnTrack(func(track *webrtc.TrackRemote, receiver *webrtc.RTPReceiver){//Simulcast一般會觸發(fā)OnTrack四次,一個audio,三個video//由于三個video的trackId一樣,共用一個WebRTCReceiverr, pub := p.router.AddReceiver(receiver,track)if pub {//這里video到來的第一次pub才為true//這里把receiver發(fā)布到router里,其他peer的downtrack會掛載到receiver下p.session.Publish(p.router, r)這里為了方便,再貼一下整個流程的代碼,比較繁瑣,可以跳過。
SessionLocal.Publish
func (s *SessionLocal) Publish(router Router,r Receiver) {for _, p := ranges.Peers() {// Don't sub toselfif router.ID() == p.ID() || p.Subscriber() == nil{continue}//表示根據(jù)r的信息創(chuàng)建downtrack,并增加到p.Subscriber()和r中if err :=router.AddDownTracks(p.Subscriber(), r); err !=nil {Logger.Error(err, "Errorsubscribing transport to Router")continue}} }router.AddDownTracks
func (r *router) AddDownTracks(s *Subscriber,recv Receiver) error { 。。。 //如果recv不為空,表示根據(jù)recv的信息創(chuàng)建downtrack,并增加到s和recv中if recv != nil{if _, err :=r.AddDownTrack(s, recv); err != nil {return err}s.negotiate()return nil} //如果recv為空,表示遍歷房間中所有的receivers,并增加到s和recv中if len(r.receivers)> 0 {for _, rcv := ranger.receivers {if _, err :=r.AddDownTrack(s, rcv); err != nil {return err}}s.negotiate()}return nil }router.AddDownTrack
根據(jù)recv的信息創(chuàng)建downtrack,并增加到sub和recv中。
func (r *router) AddDownTrack(sub *Subscriber,recv Receiver) (*DownTrack, error) {for _, dt := rangesub.GetDownTracks(recv.StreamID()) {//避免重復添加if dt.ID() ==recv.TrackID() {return dt, nil}}codec := recv.Codec()if err := sub.me.RegisterCodec(codec, recv.Kind()); err !=nil {return nil,err}//創(chuàng)建downtrack,downtrack用來給客戶端下發(fā)流downTrack, err := NewDownTrack(webrtc.RTPCodecCapability{MimeType: codec.MimeType,ClockRate: codec.ClockRate,Channels: codec.Channels,SDPFmtpLine: codec.SDPFmtpLine,RTCPFeedback:[]webrtc.RTCPFeedback{{"goog-remb", ""}, {"nack", ""}, {"nack", "pli"}},}, recv, r.bufferFactory,sub.id, r.config.MaxPacketTrack)if err != nil{return nil,err}//把downtrack增加到pc中if downTrack.transceiver,err = sub.pc.AddTransceiverFromTrack(downTrack,webrtc.RTPTransceiverInit{Direction:webrtc.RTPTransceiverDirectionSendonly,}); err != nil {return nil,err}// 設置關閉回調,關閉時pc自動刪除trackdownTrack.OnCloseHandler(func() {if sub.pc.ConnectionState() !=webrtc.PeerConnectionStateClosed {if err :=sub.pc.RemoveTrack(downTrack.transceiver.Sender()); err !=nil {if err ==webrtc.ErrConnectionClosed {return}Logger.Error(err, "Errorclosing down track")} else {//如果刪除成功,再從sub中刪除,然后重協(xié)商sub.RemoveDownTrack(recv.StreamID(), downTrack)sub.negotiate()}}})//設置OnBind回調,DownTrack.Bind()里會調用這個;PC協(xié)商完成時,DownTrack.Bind()會觸發(fā)downTrack.OnBind(func() {go sub.sendStreamDownTracksReports(recv.StreamID())})//增加downTrack到sub中,sub只是用來管理downtracks和生成SenderReport等sub.AddDownTrack(recv.StreamID(), downTrack)//增加downTrack到WebRTCReceiver中,實際收發(fā)包是WebRTCReceiver來控制,在writeRTP中recv.AddDownTrack(downTrack,r.config.Simulcast.BestQualityFirst)return downTrack, nil }5.Simulcast切換流程
第一種,自動切換。
上邊的subBestQuality,會在f層receiver到來時,自動訂閱f層。
第二種,手動切換。
通過信令或datachannel控制來切換。
先來講一下datachannel信令通道,在main里創(chuàng)建了一個內置dc,處理函數(shù)為datachannel.SubscriberAPI。
func main() {nsfu := sfu.NewSFU(conf.Config)dc :=nsfu.NewDatachannel(sfu.APIChannelLabel)dc.Use(datachannel.SubscriberAPI)s :=server.NewWrapperedGRPCWebServer(options, nsfu)if err := s.Serve(); err != nil{logger.Error(err,"failed to serve")os.Exit(1)}select {} }客戶端發(fā)過來的切大小流指令會進入此函數(shù)。
funcSubscriberAPI(nextsfu.MessageProcessor) sfu.MessageProcessor {return sfu.ProcessFunc(func(ctxcontext.Context, args sfu.ProcessArgs) {srm := &setRemoteMedia{}if err :=json.Unmarshal(args.Message.Data, srm); err != nil {return}// Publisherchanging active layersif srm.Layers !=nil && len(srm.Layers) > 0 { 。。。//當前sdk邏輯不會進入這里} else {//按流ID查找downTracksdownTracks :=args.Peer.Subscriber().GetDownTracks(srm.StreamID)for _, dt :=range downTracks {switch dt.Kind() {casewebrtc.RTPCodecTypeAudio:dt.Mute(!srm.Audio)//音頻是否需要mute/unmutecasewebrtc.RTPCodecTypeVideo:switchsrm.Video {//視頻是否需要切大小流/mutecasehighValue://這里把d.reSync.set設置為true了,writeSimulcastRTP里會自動發(fā)PLIdt.Mute(false)dt.SwitchSpatialLayer(2, true)casemediumValue:dt.Mute(false)dt.SwitchSpatialLayer(1, true)caselowValue:dt.Mute(false)dt.SwitchSpatialLayer(0, true)casemutedValue:dt.Mute(true)}switchsrm.Framerate {//當前sdk邏輯也不會進入這里,srm.Framerate=""}}}}next.Process(ctx, args)}) }DownTrack.SwitchSpatialLayer
func (d *DownTrack) SwitchSpatialLayer(targetLayer int32, setAsMax bool) error {if d.trackType ==SimulcastDownTrack {// Don't switchuntil previous switch is done or canceledcsl := atomic.LoadInt32(&d.currentSpatialLayer)//如果當前運行l(wèi)ayer不是正在切的layer,或當前l(fā)ayer是要切的//換句話說,如果當前l(fā)ayer沒切完成,或者當前l(fā)ayer和要切的一樣,那就返回錯誤if csl !=atomic.LoadInt32(&d.targetSpatialLayer) || csl ==targetLayer {returnErrSpatialLayerBusy}//切換layerif err :=d.receiver.SwitchDownTrack(d, int(targetLayer));err == nil {atomic.StoreInt32(&d.targetSpatialLayer,targetLayer)if setAsMax {atomic.StoreInt32(&d.maxSpatialLayer,targetLayer)}}return nil}returnErrSpatialNotSupported }WebRTCReceiver.SwitchDownTrack
func (w *WebRTCReceiver) SwitchDownTrack(track *DownTrack,layer int) error {if w.closed.get() {returnerrNoReceiverFound}//切換就是把track放入pendingif w.available[layer].get() {w.Lock()w.pending[layer].set(true)w.pendingTracks[layer] = append(w.pendingTracks[layer],track)w.Unlock()return nil}return errNoReceiverFound }然后在writeRTP里切換:
func (w *WebRTCReceiver) writeRTP(layer int) { ....for {pkt, err :=w.buffers[layer].ReadExtended()if err ==io.EOF {return}//如果是大小流if w.isSimulcast {//如果正在切換,pending[layer]get()為trueifw.pending[layer].get() {// 如果是關鍵幀,才會切換,好在前邊Mute流程里發(fā)送了PLI,這里應該很快來一個關鍵幀ifpkt.KeyFrame {w.Lock()//=========這里切換for idx, dt:= range w.pendingTracks[layer] {//刪除原來的w.deleteDownTrack(dt.CurrentSpatialLayer(), dt.peerID)//存儲新的dt,以后writeRTP會寫入新的dtw.storeDownTrack(layer, dt)//設置切換完成dt.SwitchSpatialLayerDone(int32(layer))//pending中此dt置空w.pendingTracks[layer][idx] = nil}//清空pendingTracks此layerw.pendingTracks[layer] = w.pendingTracks[layer][:0]//標志位置為falsew.pending[layer].set(false)w.Unlock()} else {// 如果不是關鍵幀,再次發(fā)送PLIw.SendRTCP(pli)}}}for _, dt := rangew.downTracks[layer].Load().([]*DownTrack){if err = dt.WriteRTP(pkt, layer);err != nil {if err ==io.EOF && err == io.ErrClosedPipe {w.Lock()w.deleteDownTrack(layer, dt.id)w.Unlock()}log.Error().Err(err).Str("id", dt.id).Msg("Errorwriting to down track")}}} }6. 總結
Simulcast在ion-sfu中,默認是通過datachannel來操作切換的。
首先,切換是操作pendingTracks:
SubscriberAPI---》dt.SwitchSpatialLayer-->WebRTCReceiver.SwitchDownTrack--->寫入pendingTracks
然后,在WebRTCReceiver.writeRTP里進行實質切換:
WebRTCReceiver.writeRTP--->讀取pendingTracks---》更換downTracks--》storeDownTrack--》OK
之后,寫包就會寫入新track。至此一個簡單的Simulcast收發(fā)模型就建成了:
SDK---SFU--->WebRTCReceiver(audio).buffer[0].ReadExtended---->downTracks[0][0].WriteRTP->SDK| |....| |--->downTracks[0][N].WriteRTP||---->WebRTCReceiver(video).buffer[0].ReadExtended---->downTracks[0][0].WriteRTP| |....| |---->downTracks[0][N].WriteRTP||------------->buffer[1].ReadExtended---->downTracks[1][0].WriteRTP| |....| |----->downTracks[1][N].WriteRTP||------------->buffer[2].ReadExtended---->downTracks[2][0].WriteRTP|....|------>downTracks[2][N].WriteRTP作者簡介:
王朋闖:前百度RTN資深工程師,前金山云RTC技術專家,前VIPKID流媒體架構師,ION開源項目發(fā)起人。
特別說明:
本文發(fā)布于知乎,已獲得作者授權轉載。
講師招募
LiveVideoStackCon 2022 音視頻技術大會 上海站,正在面向社會公開招募講師,無論你所處的公司大小,title高低,老鳥還是菜鳥,只要你的內容對技術人有幫助,其他都是次要的。歡迎通過?speaker@livevideostack.com?提交個人資料及議題描述,我們將會在24小時內給予反饋。
喜歡我們的內容就點個“在看”吧!
總結
以上是生活随笔為你收集整理的大话ion系列(四)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 什么是视频预处理?
- 下一篇: 【城市沙龙】LiveVideoStack