生活随笔
收集整理的這篇文章主要介紹了
srs代码学习(4)-怎么转发流
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
publish的流和play的流怎么連接呢?這個恐怕是最繞的地方了。看了一上午的代碼,淹沒于各種數據結構與流程之中后,俺終于發現了連接publish和play的關鍵連個類是
SrsSource
SrsConsumer
負責連接著連個類實例的是
SrsRtmpConn?
下面我們詳細講解連接過程
上片我們說到。在底層客戶端連接上來后,會經過一系列處理,最后繞到SrsRtmpConn類的循環函數中。就是下面的函數
[cpp]?view plaincopy
int?SrsConnection::cycle()?? {?? ????int?ret?=?ERROR_SUCCESS;?? ?????? ????_srs_context->generate_id();?? ????id?=?_srs_context->get_id();?? ?????? ????ip?=?srs_get_peer_ip(st_netfd_fileno(stfd));?? ?????? ????ret?=?do_cycle();?? ?????? ?????? ????if?(srs_is_client_gracefully_close(ret))?{?? ????????ret?=?ERROR_SOCKET_CLOSED;?? ????}?? ?????? ?????? ????if?(ret?==?ERROR_SUCCESS)?{?? ????????srs_trace("client?finished.");?? ????}?? ?????? ?????? ????if?(ret?==?ERROR_SOCKET_CLOSED)?{?? ????????srs_warn("client?disconnect?peer.?ret=%d",?ret);?? ????}?? ?? ????return?ERROR_SUCCESS;?? }??
這個是SrsRtmpConn的基類SrsConnection的函數。在基類里,do_cycle()是個純虛函數。具體實現完全是靠這子類來的。
那么rtmp類型的這個子類,到底有多么的變態呢,先看看我畫的一個流程圖,都沒有畫完。一張放不下,的截好幾張
夠長的,這里我還只是畫到了播放的時候,發布流程還沒有畫。因為太復雜了。
下面開始一步一步的分析
首先看do_cycle()函數這個函數主要負責握手和連命令。并在成功后。獲取流的配置信息。關鍵代碼如下
[cpp]?view plaincopy
if?((ret?=?rtmp->handshake())?!=?ERROR_SUCCESS)?{?? ????????srs_error("rtmp?handshake?failed.?ret=%d",?ret);?? ????????return?ret;?? ????}?? ????srs_verbose("rtmp?handshake?success");?? ?????? ????if?((ret?=?rtmp->connect_app(req))?!=?ERROR_SUCCESS)?{?? ????????srs_error("rtmp?connect?vhost/app?failed.?ret=%d",?ret);?? ????????return?ret;?? ????}?? ????srs_verbose("rtmp?connect?app?success");??
注意這里有一個比較重要的數據結構
[cpp]?view plaincopy
SrsRequest*?req??
這個主要是存儲請求信息的,比如app turl streamid等等。
在各種分析后,進入下一個cycle,service_cycle()函數
service_cycly()函數在做了一些設置工作,設置比如chunk size。代碼如下
[cpp]?view plaincopy
if?((ret?=?rtmp->set_window_ack_size((int)(2.5?*?1000?*?1000)))?!=?ERROR_SUCCESS)?{?? ???????srs_error("set?window?acknowledgement?size?failed.?ret=%d",?ret);?? ???????return?ret;?? ???}?? ???srs_verbose("set?window?acknowledgement?size?success");?? ????????? ???if?((ret?=?rtmp->set_peer_bandwidth((int)(2.5?*?1000?*?1000),?2))?!=?ERROR_SUCCESS)?{?? ???????srs_error("set?peer?bandwidth?failed.?ret=%d",?ret);?? ???????return?ret;?? ???}??
下面一段代碼沒有看明白。這個是一個補丁打上去的,說說為了做do token traverse。這個暫時先不研究了。
[cpp]?view plaincopy
if?(true)?{?? ????????bool?vhost_is_edge?=?_srs_config->get_vhost_is_edge(req->vhost);?? ????????bool?edge_traverse?=?_srs_config->get_vhost_edge_token_traverse(req->vhost);?? ????????if?(vhost_is_edge?&&?edge_traverse)?{?? ????????????if?((ret?=?check_edge_token_traverse_auth())?!=?ERROR_SUCCESS)?{?? ????????????????srs_warn("token?auth?failed,?ret=%d",?ret);?? ????????????????return?ret;?? ????????????}?? ????????}?? ????}??
接著設置chunk 的大小
[cpp]?view plaincopy
int?chunk_size?=?_srs_config->get_chunk_size(req->vhost);?? ???if?((ret?=?rtmp->set_chunk_size(chunk_size))?!=?ERROR_SUCCESS)?{?? ???????srs_error("set?chunk_size=%d?failed.?ret=%d",?chunk_size,?ret);?? ???????return?ret;?? ???}??
回應客戶端。連接ok
[cpp]?view plaincopy
if?((ret?=?rtmp->response_connect_app(req,?local_ip.c_str()))?!=?ERROR_SUCCESS)?{?? ???????srs_error("response?connect?app?failed.?ret=%d",?ret);?? ???????return?ret;?? ???}??
然后連接就結束了,進入stream_service_cycle()函數,從名字上就可以看出。這個函數是開始就如流命令時代
[cpp]?view plaincopy
while?(!disposed)?{?? ????ret?=?stream_service_cycle();?? ?????? ?????? ?????? ????if?(ret?==?ERROR_SUCCESS)?{?? ????????continue;?? ????}?? ?????? ?????? ????if?(!srs_is_system_control_error(ret))?{?? ????????if?(ret?!=?ERROR_SOCKET_TIMEOUT?&&?!srs_is_client_gracefully_close(ret))?{?? ????????????srs_error("stream?service?cycle?failed.?ret=%d",?ret);?? ????????}?? ????????return?ret;?? ????}?? ?????? ?????? ????if?(ret?==?ERROR_CONTROL_REPUBLISH)?{?? ?????????? ????????rtmp->set_send_timeout(SRS_REPUBLISH_RECV_TIMEOUT_US);?? ????????rtmp->set_recv_timeout(SRS_REPUBLISH_SEND_TIMEOUT_US);?? ?????????? ????????srs_trace("control?message(unpublish)?accept,?retry?stream?service.");?? ????????continue;?? ????}?? ?????? ?????? ?????? ????if?(ret?==?ERROR_CONTROL_RTMP_CLOSE)?{?? ?????????? ?????????? ?????????? ????????rtmp->set_recv_timeout(SRS_PAUSED_RECV_TIMEOUT_US);?? ????????rtmp->set_send_timeout(SRS_PAUSED_SEND_TIMEOUT_US);?? ?????????? ????????srs_trace("control?message(close)?accept,?retry?stream?service.");?? ????????continue;?? ????}?? ?????? ?????? ????srs_error("control?message(%d)?reject?as?error.?ret=%d",?ret,?ret);?? ????return?ret;?? }??
stream_service_cycle()函數閃亮登場
首先進行一些安全驗證
[cpp]?view plaincopy
f?((ret?=?rtmp->identify_client(res->stream_id,?type,?req->stream,?req->duration))?!=?ERROR_SUCCESS)?{?? ????????if?(!srs_is_client_gracefully_close(ret))?{?? ????????????srs_error("identify?client?failed.?ret=%d",?ret);?? ????????}?? ????????return?ret;?? ????}?? ????req->strip();?? ????srs_trace("client?identified,?type=%s,?stream_name=%s,?duration=%.2f",??? ????????srs_client_type_string(type).c_str(),?req->stream.c_str(),?req->duration);?? ?????? ?????? ????if?((ret?=?security->check(type,?ip,?req))?!=?ERROR_SUCCESS)?{?? ????????srs_error("security?check?failed.?ret=%d",?ret);?? ????????return?ret;?? ????}?? ????srs_info("security?check?ok");??
然后進入比較有意思的環節
[cpp]?view plaincopy
SrsSource*?source?=?SrsSource::fetch(req);?? ???if?(!source)?{?? ???????if?((ret?=?SrsSource::create(req,?server,?server,?&source))?!=?ERROR_SUCCESS)?{?? ???????????return?ret;?? ???????}?? ???}?? ???srs_assert(source?!=?NULL);??
根據req,尋找是否有這個源,如果沒有,那么久創建一個。主要creat()是個靜態函數。實現代碼為
[cpp]?view plaincopy
int?SrsSource::create(SrsRequest*?r,?ISrsSourceHandler*?h,?ISrsHlsHandler*?hh,?SrsSource**?pps)?? {?? ????int?ret?=?ERROR_SUCCESS;?? ?????? ????string?stream_url?=?r->get_stream_url();?? ????string?vhost?=?r->vhost;?? ?????? ?????? ????srs_assert?(pool.find(stream_url)?==?pool.end());?? ?? ????SrsSource*?source?=?new?SrsSource();?? ????if?((ret?=?source->initialize(r,?h,?hh))?!=?ERROR_SUCCESS)?{?? ????????srs_freep(source);?? ????????return?ret;?? ????}?? ?????????? ????pool[stream_url]?=?source;?? ????srs_info("create?new?source?for?url=%s,?vhost=%s",?stream_url.c_str(),?vhost.c_str());?? ?????? ????*pps?=?source;?? ?????? ????return?ret;?? }??
創建一個新的source,并且放到poo中。pool是什么
[cpp]?view plaincopy
static?std::map<std::string,?SrsSource*>?pool;??
也是一個全局的靜態變量,用了存儲所欲的源。到此。謎底進一步解開了。
同意fetch()函數也是靜態的。
[cpp]?view plaincopy
static?SrsSource*?fetch(SrsRequest*?r);?? static?SrsSource*?fetch(std::string?vhost,?std::string?app,?std::string?stream);??
ok!在繞道循環函數看看接下來該怎么辦
[cpp]?view plaincopy
SrsStatistic*?stat?=?SrsStatistic::instance();?? ????if?((ret?=?stat->on_client(_srs_context->get_id(),?req,?this,?type))?!=?ERROR_SUCCESS)?{?? ????????srs_error("stat?client?failed.?ret=%d",?ret);?? ????????return?ret;?? ????}??
這個是做統計用的,沒啥。
[cpp]?view plaincopy
bool?vhost_is_edge?=?_srs_config->get_vhost_is_edge(req->vhost);?? ????bool?enabled_cache?=?_srs_config->get_gop_cache(req->vhost);?? ????srs_trace("source?url=%s,?ip=%s,?cache=%d,?is_edge=%d,?source_id=%d[%d]",?? ????????req->get_stream_url().c_str(),?ip.c_str(),?enabled_cache,?vhost_is_edge,??? ????????source->source_id(),?source->source_id());?? ????source->set_cache(enabled_cache);??
判斷是否是邊緣節點,是否需要gop緩沖。無他
[cpp]?view plaincopy
switch?(type)?{?? ???????case?SrsRtmpConnPlay:?{?? ???????????srs_verbose("start?to?play?stream?%s.",?req->stream.c_str());?? ????????????? ????????????? ???????????if?((ret?=?rtmp->start_play(res->stream_id))?!=?ERROR_SUCCESS)?{?? ???????????????srs_error("start?to?play?stream?failed.?ret=%d",?ret);?? ???????????????return?ret;?? ???????????}?? ???????????if?((ret?=?http_hooks_on_play())?!=?ERROR_SUCCESS)?{?? ???????????????srs_error("http?hook?on_play?failed.?ret=%d",?ret);?? ???????????????return?ret;?? ???????????}?? ????????????? ???????????srs_info("start?to?play?stream?%s?success",?req->stream.c_str());?? ???????????ret?=?playing(source);?? ???????????http_hooks_on_stop();?? ????????????? ???????????return?ret;?? ???????}?? ???????case?SrsRtmpConnFMLEPublish:?{?? ???????????srs_verbose("FMLE?start?to?publish?stream?%s.",?req->stream.c_str());?? ????????????? ???????????if?((ret?=?rtmp->start_fmle_publish(res->stream_id))?!=?ERROR_SUCCESS)?{?? ???????????????srs_error("start?to?publish?stream?failed.?ret=%d",?ret);?? ???????????????return?ret;?? ???????????}?? ????????????? ???????????return?publishing(source);?? ???????}?? ???????case?SrsRtmpConnFlashPublish:?{?? ???????????srs_verbose("flash?start?to?publish?stream?%s.",?req->stream.c_str());?? ????????????? ???????????if?((ret?=?rtmp->start_flash_publish(res->stream_id))?!=?ERROR_SUCCESS)?{?? ???????????????srs_error("flash?start?to?publish?stream?failed.?ret=%d",?ret);?? ???????????????return?ret;?? ???????????}?? ????????????? ???????????return?publishing(source);?? ???????}?? ???????default:?{?? ???????????ret?=?ERROR_SYSTEM_CLIENT_INVALID;?? ???????????srs_info("invalid?client?type=%d.?ret=%d",?type,?ret);?? ???????????return?ret;?? ???????}?? ???}??
大流程看。好像是根據不同走到了發布或者播放流程里。但首先。這個type是從哪里來的。怎么沒有發現呢?
[cpp]?view plaincopy
int?SrsRtmpServer::identify_client(int?stream_id,?SrsRtmpConnType&?type,?string&?stream_name,?double&?duration)??
在這個函數里做確認類型的。rmptserver類不在我們這次分析。
我們分析下play的流程,函數名稱為
[cpp]?view plaincopy
int?SrsRtmpConn::playing(SrsSource*?source)??
關鍵代碼
[cpp]?view plaincopy
SrsConsumer*?consumer?=?NULL;?? ???if?((ret?=?source->create_consumer(this,?consumer))?!=?ERROR_SUCCESS)?{?? ???????srs_error("create?consumer?failed.?ret=%d",?ret);?? ???????return?ret;?? ???}?? ???SrsAutoFree(SrsConsumer,?consumer);?? ???srs_verbose("consumer?created?success.");??
?利用source創建一個consumer.創建代碼為
[cpp]?view plaincopy
int?SrsSource::create_consumer(SrsConnection*?conn,?SrsConsumer*&?consumer,?bool?ds,?bool?dm,?bool?dg)?? {?? ????int?ret?=?ERROR_SUCCESS;?? ?????? ????consumer?=?new?SrsConsumer(this,?conn);?? ????consumers.push_back(consumer);?? ?????? ????double?queue_size?=?_srs_config->get_queue_length(_req->vhost);?? ????consumer->set_queue_size(queue_size);?? ?????? ?????? ????if?(atc?&&?!gop_cache->empty())?{?? ????????if?(cache_metadata)?{?? ????????????cache_metadata->timestamp?=?gop_cache->start_time();?? ????????}?? ????????if?(cache_sh_video)?{?? ????????????cache_sh_video->timestamp?=?gop_cache->start_time();?? ????????}?? ????????if?(cache_sh_audio)?{?? ????????????cache_sh_audio->timestamp?=?gop_cache->start_time();?? ????????}?? ????}?? ?????? ?????? ????if?(dm?&&?cache_metadata?&&?(ret?=?consumer->enqueue(cache_metadata,?atc,?jitter_algorithm))?!=?ERROR_SUCCESS)?{?? ????????srs_error("dispatch?metadata?failed.?ret=%d",?ret);?? ????????return?ret;?? ????}?? ????srs_info("dispatch?metadata?success");?? ?????? ?????? ?????? ?????? ????if?(ds?&&?cache_sh_audio?&&?(ret?=?consumer->enqueue(cache_sh_audio,?atc,?jitter_algorithm))?!=?ERROR_SUCCESS)?{?? ????????srs_error("dispatch?audio?sequence?header?failed.?ret=%d",?ret);?? ????????return?ret;?? ????}?? ????srs_info("dispatch?audio?sequence?header?success");?? ?? ????if?(ds?&&?cache_sh_video?&&?(ret?=?consumer->enqueue(cache_sh_video,?atc,?jitter_algorithm))?!=?ERROR_SUCCESS)?{?? ????????srs_error("dispatch?video?sequence?header?failed.?ret=%d",?ret);?? ????????return?ret;?? ????}?? ????srs_info("dispatch?video?sequence?header?success");?? ?????? ?????? ????if?(dg?&&?(ret?=?gop_cache->dump(consumer,?atc,?jitter_algorithm))?!=?ERROR_SUCCESS)?{?? ????????return?ret;?? ????}?? ?????? ?????? ????if?(dg)?{?? ????????srs_trace("create?consumer,?queue_size=%.2f,?jitter=%d",?queue_size,?jitter_algorithm);?? ????}?else?{?? ????????srs_trace("create?consumer,?ignore?gop?cache,?jitter=%d",?jitter_algorithm);?? ????}?? ?? ?????? ????if?(_srs_config->get_vhost_is_edge(_req->vhost))?{?? ?????????? ????????if?((ret?=?play_edge->on_client_play())?!=?ERROR_SUCCESS)?{?? ????????????srs_error("notice?edge?start?play?stream?failed.?ret=%d",?ret);?? ????????????return?ret;?? ????????}?? ????}?? ?????? ????return?ret;?? }??
代碼好長。主要是創建 放進數據結構中,并拷貝一些metadata進去,對于edge的處理,還沒有看明白。
之后的動作
[cpp]?view plaincopy
SrsQueueRecvThread?trd(consumer,?rtmp,?SRS_PERF_MW_SLEEP);?? ?????? ?????? ????if?((ret?=?trd.start())?!=?ERROR_SUCCESS)?{?? ????????srs_error("start?isolate?recv?thread?failed.?ret=%d",?ret);?? ????????return?ret;?? ????}??
什么?單獨創建了一個接受線程,實現了recv never send,send never recv,據說這樣效率提高了33%
在繞回去
[cpp]?view plaincopy
?? wakable?=?consumer;?? ret?=?do_playing(source,?consumer,?&trd);?? wakable?=?NULL;??
進入下一個循環體do_playing()
在分析下一個函數之前。讓我總結下縮做的工作
1)創建或者獲取了一個source
2)創建一個consumer
3) 創建一個接受線程
下面開始看函數的關鍵代碼
[cpp]?view plaincopy
?? ????realtime?=?_srs_config->get_realtime_enabled(req->vhost);?? ?????? ?????? ????mw_enabled?=?true;?? ????change_mw_sleep(_srs_config->get_mw_sleep_ms(req->vhost));?? ?????? ????send_min_interval?=?_srs_config->get_send_min_interval(req->vhost);??
做實時性 merge write 的設置
[cpp]?view plaincopy
while?(!trd->empty())?{?? ??????????SrsCommonMessage*?msg?=?trd->pump();?? ??????????srs_verbose("pump?client?message?to?process.");?? ???????????? ??????????if?((ret?=?process_play_control_msg(consumer,?msg))?!=?ERROR_SUCCESS)?{?? ??????????????if?(!srs_is_system_control_error(ret)?&&?!srs_is_client_gracefully_close(ret))?{?? ??????????????????srs_error("process?play?control?message?failed.?ret=%d",?ret);?? ??????????????}?? ??????????????return?ret;?? ??????????}?? ??????}??
首先處理接受消息。主要是暫停的消息。
下面進入核心代碼
[cpp]?view plaincopy
<strong><span?style="color:#ff6666;">?int?count?=?(send_min_interval?>?0)??1?:?0;?? ????????if?((ret?=?consumer->dump_packets(&msgs,?count))?!=?ERROR_SUCCESS)?{?? ????????????srs_error("get?messages?from?consumer?failed.?ret=%d",?ret);?? ????????????return?ret;?? ????????}</span></strong>??
這段代碼的作用就是,把消息,從consumer里,拷貝到本地的msg隊列里。當然。這個拷貝是淺拷貝,只是指針過來了。
首先看msgs的定義
[cpp]?view plaincopy
SrsMessageArray?msgs(SRS_PERF_MW_MSGS)??
這個類里有個核心變量
[cpp]?view plaincopy
SrsSharedPtrMessage**?msgs;??
可以看到它保存的是一個指向指針的指針。
那么dump_packets是怎么實現的呢?
[cpp]?view plaincopy
int?SrsConsumer::dump_packets(SrsMessageArray*?msgs,?int&?count)?? {?? ????int?ret?=ERROR_SUCCESS;?? ?????? ????srs_assert(count?>=?0);?? ????srs_assert(msgs->max?>?0);?? ?????? ?????? ????int?max?=?count??srs_min(count,?msgs->max)?:?msgs->max;?? ?????? ?????? ?????? ????count?=?0;?? ?????? ????if?(should_update_source_id)?{?? ????????srs_trace("update?source_id=%d[%d]",?source->source_id(),?source->source_id());?? ????????should_update_source_id?=?false;?? ????}?? ?????? ?????? ????if?(paused)?{?? ????????return?ret;?? ????}?? ?? ?????? ????if?((ret?=?queue->dump_packets(max,?msgs->msgs,?count))?!=?ERROR_SUCCESS)?{?? ????????return?ret;?? ????}?? ?????? ????return?ret;?? }??
[cpp]?view plaincopy
int?SrsMessageQueue::dump_packets(int?max_count,?SrsSharedPtrMessage**?pmsgs,?int&?count)?? {?? ????int?ret?=?ERROR_SUCCESS;?? ?????? ????int?nb_msgs?=?(int)msgs.size();?? ????if?(nb_msgs?<=?0)?{?? ????????return?ret;?? ????}?? ?????? ????srs_assert(max_count?>?0);?? ????count?=?srs_min(max_count,?nb_msgs);?? ?? ????SrsSharedPtrMessage**?omsgs?=?msgs.data();?? ????for?(int?i?=?0;?i?<?count;?i++)?{?? ????????pmsgs[i]?=?omsgs[i];?? ????}?? ?????? ????SrsSharedPtrMessage*?last?=?omsgs[count?-?1];?? ????av_start_time?=?last->timestamp;?? ?????? ????if?(count?>=?nb_msgs)?{?? ?????????? ????????msgs.clear();?? ????}?else?{?? ?????????? ?????????? ?????????? ?????????? ????????msgs.erase(msgs.begin(),?msgs.begin()?+?count);?? ????}?? ?????? ????return?ret;?? }??
ok代碼我就不想分析了。只是個指針拷貝。
下一個問題。consumer的數據是怎么來的呢?
看source的代碼,比如音頻數據
[cpp]?view plaincopy
int?SrsSource::on_audio_imp(SrsSharedPtrMessage*?msg)??
代碼里有這么一段
[cpp]?view plaincopy
?? ???if?(!drop_for_reduce)?{?? ???????for?(int?i?=?0;?i?<?(int)consumers.size();?i++)?{?? ???????????SrsConsumer*?consumer?=?consumers.at(i);?? ???????????if?((ret?=?consumer->enqueue(msg,?atc,?jitter_algorithm))?!=?ERROR_SUCCESS)?{?? ???????????????srs_error("dispatch?the?audio?failed.?ret=%d",?ret);?? ???????????????return?ret;?? ???????????}?? ???????}?? ???????srs_info("dispatch?audio?success.");?? ???}??
到此,一個循環就結束了。
總結
以上是生活随笔為你收集整理的srs代码学习(4)-怎么转发流的全部內容,希望文章能夠幫你解決所遇到的問題。
如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。