live555 源码分析:MediaSever
位于 live555 項目 mediaServer 目錄下的是 “LIVE555 Media Server”,它是一個完整的 RTSP 服務器應用程序。它可以把多種媒體文件轉為流,提供給請求者。
這里來看一下 “LIVE555 Media Server” 的實現。拋開其中向終端輸出應用程序信息的代碼, “LIVE555 Media Server” 主程序的代碼像下面這樣:
#include <BasicUsageEnvironment.hh> #include "DynamicRTSPServer.hh" #include "version.hh"int main(int argc, char** argv) {// Begin by setting up our usage environment:TaskScheduler* scheduler = BasicTaskScheduler::createNew();UsageEnvironment* env = BasicUsageEnvironment::createNew(*scheduler);UserAuthenticationDatabase* authDB = NULL; #ifdef ACCESS_CONTROL// To implement client access control to the RTSP server, do the following:authDB = new UserAuthenticationDatabase;authDB->addUserRecord("username1", "password1"); // replace these with real strings// Repeat the above with each <username>, <password> that you wish to allow// access to the server. #endif// Create the RTSP server. Try first with the default port number (554),// and then with the alternative port number (8554):RTSPServer* rtspServer;portNumBits rtspServerPortNum = 554;rtspServer = DynamicRTSPServer::createNew(*env, rtspServerPortNum, authDB);if (rtspServer == NULL) {rtspServerPortNum = 8554;rtspServer = DynamicRTSPServer::createNew(*env, rtspServerPortNum, authDB);}if (rtspServer == NULL) {*env << "Failed to create RTSP server: " << env->getResultMsg() << "\n";exit(1);}. . . . . .if (rtspServer->setUpTunnelingOverHTTP(80) || rtspServer->setUpTunnelingOverHTTP(8000) || rtspServer->setUpTunnelingOverHTTP(8080)) {*env << "(We use port " << rtspServer->httpServerPortNum() << " for optional RTSP-over-HTTP tunneling, or for HTTP live streaming (for indexed Transport Stream files only).)\n";} else {*env << "(RTSP-over-HTTP tunneling is not available.)\n";}env->taskScheduler().doEventLoop(); // does not returnreturn 0; // only to prevent compiler warning }“LIVE555 Media Server” 的主程序非常簡單,做的事情僅僅如下面這樣:
1. 創建 TaskScheduler 用于執行任務調度。關于 TaskScheduler 的更詳細信息可以參考 live555 源碼分析:基礎設施。
2. 創建 UsageEnvironment 用于日志輸出等 I/O 操作。關于 UsageEnvironment 的更詳細信息可以參考 live555 源碼分析:基礎設施。
3. 創建 RTSPServer 用于接受連接、處理請求等。這里會首先嘗試使用 554 端口,如果失敗,就嘗試使用 8554 端口,如果兩個端口都沒法用,就失敗退出程序。
4. 為 RTSPServer 設置 HTTP 隧道端口。這里會依次嘗試使用 80、8000 和 8080 端口。
5. 執行事件循環。
TaskScheduler 用于執行任務調度,但要監聽的 socket,以及 socket 上的 I/O 事件的處理程序,則需要 RTSPServer,也就是 DynamicRTSPServer 提供。DynamicRTSPServer 是 “LIVE555 Media Server” 應用程序的核心,其定義如下:
#ifndef _RTSP_SERVER_SUPPORTING_HTTP_STREAMING_HH #include "RTSPServerSupportingHTTPStreaming.hh" #endifclass DynamicRTSPServer: public RTSPServerSupportingHTTPStreaming { public:static DynamicRTSPServer* createNew(UsageEnvironment& env, Port ourPort,UserAuthenticationDatabase* authDatabase,unsigned reclamationTestSeconds = 65);protected:DynamicRTSPServer(UsageEnvironment& env, int ourSocket, Port ourPort,UserAuthenticationDatabase* authDatabase, unsigned reclamationTestSeconds);// called only by createNew();virtual ~DynamicRTSPServer();protected: // redefined virtual functionsvirtual ServerMediaSession*lookupServerMediaSession(char const* streamName, Boolean isFirstLookupInSession); };DynamicRTSPServer 的類層次結構如下圖:
要監聽的 socket 以及 socket 上的 I/O 事件的處理程序是在 DynamicRTSPServer 創建的過程中注冊給 TaskScheduler 的。DynamicRTSPServer 需要通過其靜態函數 createNew() 創建,該函數定義如下:
DynamicRTSPServer* DynamicRTSPServer::createNew(UsageEnvironment& env, Port ourPort,UserAuthenticationDatabase* authDatabase,unsigned reclamationTestSeconds) {int ourSocket = setUpOurSocket(env, ourPort);if (ourSocket == -1) return NULL;return new DynamicRTSPServer(env, ourSocket, ourPort, authDatabase, reclamationTestSeconds); }DynamicRTSPServer::DynamicRTSPServer(UsageEnvironment& env, int ourSocket,Port ourPort,UserAuthenticationDatabase* authDatabase, unsigned reclamationTestSeconds): RTSPServerSupportingHTTPStreaming(env, ourSocket, ourPort, authDatabase, reclamationTestSeconds) { }在 DynamicRTSPServer 的 createNew() 中,首先創建一個 socket,然后用這個 socket 創建 DynamicRTSPServer 對象。構造函數順著類繼承層次一級級執行上去。
RTSPServerSupportingHTTPStreaming 構造函數定義如下:
RTSPServerSupportingHTTPStreaming ::RTSPServerSupportingHTTPStreaming(UsageEnvironment& env, int ourSocket, Port rtspPort,UserAuthenticationDatabase* authDatabase, unsigned reclamationTestSeconds): RTSPServer(env, ourSocket, rtspPort, authDatabase, reclamationTestSeconds) { }RTSPServer 構造函數定義如下:
RTSPServer::RTSPServer(UsageEnvironment& env,int ourSocket, Port ourPort,UserAuthenticationDatabase* authDatabase,unsigned reclamationSeconds): GenericMediaServer(env, ourSocket, ourPort, reclamationSeconds),fHTTPServerSocket(-1), fHTTPServerPort(0),fClientConnectionsForHTTPTunneling(NULL), // will get created if neededfTCPStreamingDatabase(HashTable::create(ONE_WORD_HASH_KEYS)),fPendingRegisterOrDeregisterRequests(HashTable::create(ONE_WORD_HASH_KEYS)),fRegisterOrDeregisterRequestCounter(0), fAuthDB(authDatabase), fAllowStreamingRTPOverTCP(True) { }GenericMediaServer 構造函數定義如下:
GenericMediaServer ::GenericMediaServer(UsageEnvironment& env, int ourSocket, Port ourPort,unsigned reclamationSeconds): Medium(env),fServerSocket(ourSocket), fServerPort(ourPort), fReclamationSeconds(reclamationSeconds),fServerMediaSessions(HashTable::create(STRING_HASH_KEYS)),fClientConnections(HashTable::create(ONE_WORD_HASH_KEYS)),fClientSessions(HashTable::create(STRING_HASH_KEYS)) {ignoreSigPipeOnSocket(fServerSocket); // so that clients on the same host that are killed don't also kill us// Arrange to handle connections from others:env.taskScheduler().turnOnBackgroundReadHandling(fServerSocket, incomingConnectionHandler, this); }在 GenericMediaServer 的構造函數中,要監聽的 Server socket 及該 socket 上的 I/O 事件處理程序,被注冊給任務調度器。事件循環中檢測到 socket 上出現 I/O 事件時,該處理程序會被調用到。注冊的事件處理程序為 GenericMediaServer::incomingConnectionHandler() 。
Medium 構造函數定義如下:
Medium::Medium(UsageEnvironment& env): fEnviron(env), fNextTask(NULL) {// First generate a name for the new medium:MediaLookupTable::ourMedia(env)->generateNewName(fMediumName, mediumNameMaxLen);env.setResultMsg(fMediumName);// Then add it to our table:MediaLookupTable::ourMedia(env)->addNew(this, fMediumName); }在 Medium 中,主要通過 MediaLookupTable 維護一個 medium name 到 Medium 對象的映射表。MediaLookupTable 可以看作是 BasicHashTable 對值類型為 Medium 對象指針的特化,該類定義如下:
class MediaLookupTable { public:static MediaLookupTable* ourMedia(UsageEnvironment& env);HashTable const& getTable() { return *fTable; }protected:MediaLookupTable(UsageEnvironment& env);virtual ~MediaLookupTable();private:friend class Medium;Medium* lookup(char const* name) const;// Returns NULL if none already existsvoid addNew(Medium* medium, char* mediumName);void remove(char const* name);void generateNewName(char* mediumName, unsigned maxLen);private:UsageEnvironment& fEnv;HashTable* fTable;unsigned fNameGenerator; };MediaLookupTable 的引用實際由 UsageEnvironment 持有:
_Tables* _Tables::getOurTables(UsageEnvironment& env, Boolean createIfNotPresent) {if (env.liveMediaPriv == NULL && createIfNotPresent) {env.liveMediaPriv = new _Tables(env);}return (_Tables*)(env.liveMediaPriv); } . . . . . . MediaLookupTable* MediaLookupTable::ourMedia(UsageEnvironment& env) {_Tables* ourTables = _Tables::getOurTables(env);if (ourTables->mediaTable == NULL) {// Create a new table to record the media that are to be created in// this environment:ourTables->mediaTable = new MediaLookupTable(env);}return ourTables->mediaTable; } . . . . . . void MediaLookupTable::generateNewName(char* mediumName,unsigned /*maxLen*/) {// We should really use snprintf() here, but not all systems have itsprintf(mediumName, "liveMedia%d", fNameGenerator++); }MediaLookupTable::MediaLookupTable(UsageEnvironment& env): fEnv(env), fTable(HashTable::create(STRING_HASH_KEYS)), fNameGenerator(0) { }medium name 在 Medium 構造過程中分配,創建的 Medium 也在此時被加入 MediaLookupTable 中:
void MediaLookupTable::addNew(Medium* medium, char* mediumName) {fTable->Add(mediumName, (void*)medium); } . . . . . . void MediaLookupTable::generateNewName(char* mediumName,unsigned /*maxLen*/) {// We should really use snprintf() here, but not all systems have itsprintf(mediumName, "liveMedia%d", fNameGenerator++); }Server socket 的創建及連接建立
在 DynamicRTSPServer 的 createNew(),創建 DynamicRTSPServer 對象之前,會先通過 setUpOurSocket() 創建一個 socket,setUpOurSocket() 是 GenericMediaServer 的一個靜態函數,其定義為:
int GenericMediaServer::setUpOurSocket(UsageEnvironment& env, Port& ourPort) {int ourSocket = -1;do {// The following statement is enabled by default.// Don't disable it (by defining ALLOW_SERVER_PORT_REUSE) unless you know what you're doing. #if !defined(ALLOW_SERVER_PORT_REUSE) && !defined(ALLOW_RTSP_SERVER_PORT_REUSE)// ALLOW_RTSP_SERVER_PORT_REUSE is for backwards-compatibility #####NoReuse dummy(env); // Don't use this socket if there's already a local server using it #endifourSocket = setupStreamSocket(env, ourPort);if (ourSocket < 0) break;// Make sure we have a big send buffer:if (!increaseSendBufferTo(env, ourSocket, 50*1024)) break;// Allow multiple simultaneous connections:if (listen(ourSocket, LISTEN_BACKLOG_SIZE) < 0) {env.setResultErrMsg("listen() failed: ");break;}if (ourPort.num() == 0) {// bind() will have chosen a port for us; return it also:if (!getSourcePort(env, ourSocket, ourPort)) break;}return ourSocket;} while (0);if (ourSocket != -1) ::closeSocket(ourSocket);return -1; }GenericMediaServer::setUpOurSocket() 主要即是,通過 setupStreamSocket() 函數創建一個 TCP socket,增大該 socket 的發送緩沖區,并 listen 該 socket。
setupStreamSocket() 函數在 groupsock 模塊中定義:
_groupsockPriv* groupsockPriv(UsageEnvironment& env) {if (env.groupsockPriv == NULL) { // We need to create it_groupsockPriv* result = new _groupsockPriv;result->socketTable = NULL;result->reuseFlag = 1; // default value => allow reuse of socket numbersenv.groupsockPriv = result;}return (_groupsockPriv*)(env.groupsockPriv); }void reclaimGroupsockPriv(UsageEnvironment& env) {_groupsockPriv* priv = (_groupsockPriv*)(env.groupsockPriv);if (priv->socketTable == NULL && priv->reuseFlag == 1/*default value*/) {// We can delete the structure (to save space); it will get created again, if needed:delete priv;env.groupsockPriv = NULL;} }static int createSocket(int type) {// Call "socket()" to create a (IPv4) socket of the specified type.// But also set it to have the 'close on exec' property (if we can)int sock;#ifdef SOCK_CLOEXECsock = socket(AF_INET, type|SOCK_CLOEXEC, 0);if (sock != -1 || errno != EINVAL) return sock;// An "errno" of EINVAL likely means that the system wasn't happy with the SOCK_CLOEXEC; fall through and try again without it: #endifsock = socket(AF_INET, type, 0); #ifdef FD_CLOEXECif (sock != -1) fcntl(sock, F_SETFD, FD_CLOEXEC); #endifreturn sock; } . . . . . . int setupStreamSocket(UsageEnvironment& env,Port port, Boolean makeNonBlocking) {if (!initializeWinsockIfNecessary()) {socketErr(env, "Failed to initialize 'winsock': ");return -1;}int newSocket = createSocket(SOCK_STREAM);if (newSocket < 0) {socketErr(env, "unable to create stream socket: ");return newSocket;}int reuseFlag = groupsockPriv(env)->reuseFlag;reclaimGroupsockPriv(env);if (setsockopt(newSocket, SOL_SOCKET, SO_REUSEADDR,(const char*)&reuseFlag, sizeof reuseFlag) < 0) {socketErr(env, "setsockopt(SO_REUSEADDR) error: ");closeSocket(newSocket);return -1;}// SO_REUSEPORT doesn't really make sense for TCP sockets, so we// normally don't set them. However, if you really want to do this// #define REUSE_FOR_TCP #ifdef REUSE_FOR_TCP #if defined(__WIN32__) || defined(_WIN32)// Windoze doesn't properly handle SO_REUSEPORT #else #ifdef SO_REUSEPORTif (setsockopt(newSocket, SOL_SOCKET, SO_REUSEPORT,(const char*)&reuseFlag, sizeof reuseFlag) < 0) {socketErr(env, "setsockopt(SO_REUSEPORT) error: ");closeSocket(newSocket);return -1;} #endif #endif #endif// Note: Windoze requires binding, even if the port number is 0 #if defined(__WIN32__) || defined(_WIN32) #elseif (port.num() != 0 || ReceivingInterfaceAddr != INADDR_ANY) { #endifMAKE_SOCKADDR_IN(name, ReceivingInterfaceAddr, port.num());if (bind(newSocket, (struct sockaddr*)&name, sizeof name) != 0) {char tmpBuffer[100];sprintf(tmpBuffer, "bind() error (port number: %d): ",ntohs(port.num()));socketErr(env, tmpBuffer);closeSocket(newSocket);return -1;} #if defined(__WIN32__) || defined(_WIN32) #else} #endifif (makeNonBlocking) {if (!makeSocketNonBlocking(newSocket)) {socketErr(env, "failed to make non-blocking: ");closeSocket(newSocket);return -1;}}return newSocket; }在這里基本上就是創建 TCP socket,為 socket 設置 RESUE 選項,將 socket 綁定到目標端口,并根據需要設置 socket 為非阻塞的。
接著來看,在 live555 中是如何啟動監聽 server socket 上的 I/O 事件,并處理這些事件的。
我們看到,在 GenericMediaServer 的構造函數中,用于接收客戶端發起的連接的 server socket,及該 socket 上的 I/O 事件處理程序,被注冊給任務調度器。當有客戶端發起了到 “LIVE555 Media Server” 的連接時,該處理程序會被調用到。事件處理程序為 GenericMediaServer::incomingConnectionHandler() ,相關的幾個函數聲明為:
class GenericMediaServer: public Medium { . . . . . . protected: . . . . . .static void incomingConnectionHandler(void*, int /*mask*/);void incomingConnectionHandler();void incomingConnectionHandlerOnSocket(int serverSocket); . . . . . . protected:virtual ClientConnection* createNewClientConnection(int clientSocket, struct sockaddr_in clientAddr) = 0;GenericMediaServer::incomingConnectionHandler()為靜態函數,incomingConnectionHandler() 及 incomingConnectionHandlerOnSocket(int serverSocket) 為非虛函數,它們的定義為:
void GenericMediaServer::incomingConnectionHandler(void* instance, int /*mask*/) {GenericMediaServer* server = (GenericMediaServer*)instance;server->incomingConnectionHandler(); } void GenericMediaServer::incomingConnectionHandler() {incomingConnectionHandlerOnSocket(fServerSocket); }void GenericMediaServer::incomingConnectionHandlerOnSocket(int serverSocket) {struct sockaddr_in clientAddr;SOCKLEN_T clientAddrLen = sizeof clientAddr;int clientSocket = accept(serverSocket, (struct sockaddr*)&clientAddr, &clientAddrLen);if (clientSocket < 0) {int err = envir().getErrno();if (err != EWOULDBLOCK) {envir().setResultErrMsg("accept() failed: ");}return;}ignoreSigPipeOnSocket(clientSocket); // so that clients on the same host that are killed don't also kill usmakeSocketNonBlocking(clientSocket);increaseSendBufferTo(envir(), clientSocket, 50*1024);#ifdef DEBUGenvir() << "accept()ed connection from " << AddressString(clientAddr).val() << "\n"; #endif// Create a new object for handling this connection:(void)createNewClientConnection(clientSocket, clientAddr); }可以看到,當監聽的 server socket 上發現有客戶端發起的連接請求時,處理過程大體為:
1. 通過 accept() 獲得新建立的連接的 socket。
2. 設置新 socket 的選項,使它稱為非阻塞 socket,并增大該 socket 的發送緩沖區。可以看一下在 live555 中設置 socket 選項的這些函數,它們在 groupsock 模塊中定義:
DynamicRTSPServer -> RTSPServerSupportingHTTPStreaming -> RTSPServer -> GenericMediaServer 這個繼承層次,從 DynamicRTSPServer 類開始,逐級向上找,可以發現 createNewClientConnection() 函數的實現在 RTSPServerSupportingHTTPStreaming 類中。
RTSPServerSupportingHTTPStreaming 類的
createNewClientConnection() 函數定義如下:
這里簡單地創建一個 RTSPClientConnectionSupportingHTTPStreaming 類對象。在 live555 中,用 GenericMediaServer 的內部類 GenericMediaServer::ClientConnection 表示一個客戶端連接。對于 “LIVE555 Media Server” 而言,這個類的繼承層次體系結構如下圖所示:
RTSPClientConnectionSupportingHTTPStreaming 類對象構造函數調用其父類 RTSPServer::RTSPClientConnection 的構造函數,后者的實現為:
RTSPServer::RTSPClientConnection ::RTSPClientConnection(RTSPServer& ourServer, int clientSocket, struct sockaddr_in clientAddr): GenericMediaServer::ClientConnection(ourServer, clientSocket, clientAddr),fOurRTSPServer(ourServer), fClientInputSocket(fOurSocket), fClientOutputSocket(fOurSocket),fIsActive(True), fRecursionCount(0), fOurSessionCookie(NULL) {resetRequestBuffer(); }RTSPServer::RTSPClientConnection 的構造函數繼續調用其父類 GenericMediaServer::ClientConnection 的構造函數:
GenericMediaServer::ClientConnection ::ClientConnection(GenericMediaServer& ourServer, int clientSocket, struct sockaddr_in clientAddr): fOurServer(ourServer), fOurSocket(clientSocket), fClientAddr(clientAddr) {// Add ourself to our 'client connections' table:fOurServer.fClientConnections->Add((char const*)this, this);// Arrange to handle incoming requests:resetRequestBuffer();envir().taskScheduler().setBackgroundHandling(fOurSocket, SOCKET_READABLE|SOCKET_EXCEPTION, incomingRequestHandler, this); }在 live555 中,GenericMediaServer 用于執行通用的媒體服務器相關的管理,包括管理所有的客戶端連接。GenericMediaServer::ClientConnection 的構造函數中會將其自身加入 GenericMediaServer 的哈希表 fClientConnections 中。更重要的是,將客戶端連接的 socket 及該 socket 上的 I/O 事件處理程序注冊給了任務調度器,其中 I/O 事件處理程序為 GenericMediaServer::ClientConnection::incomingRequestHandler() 函數。
也就是說,”LIVE555 Media Server” 中與客戶端的交互將由 GenericMediaServer::ClientConnection::incomingRequestHandler() 函數完成,這個函數定義如下:
void GenericMediaServer::ClientConnection::incomingRequestHandler(void* instance, int /*mask*/) {ClientConnection* connection = (ClientConnection*)instance;connection->incomingRequestHandler(); }void GenericMediaServer::ClientConnection::incomingRequestHandler() {struct sockaddr_in dummy; // 'from' address, meaningless in this caseint bytesRead = readSocket(envir(), fOurSocket, &fRequestBuffer[fRequestBytesAlreadySeen], fRequestBufferBytesLeft, dummy);handleRequestBytes(bytesRead); }在 GenericMediaServer::ClientConnection 的 incomingRequestHandler() 中,會從客戶端連接的 socket 中讀取數據,然后調用 handleRequestBytes() 函數做進一步的處理。
讀取數據的動作由 groupsock 模塊中的 readSocket() 完成:
int readSocket(UsageEnvironment& env,int socket, unsigned char* buffer, unsigned bufferSize,struct sockaddr_in& fromAddress) {SOCKLEN_T addressSize = sizeof fromAddress;int bytesRead = recvfrom(socket, (char*)buffer, bufferSize, 0,(struct sockaddr*)&fromAddress,&addressSize);if (bytesRead < 0) {//##### HACK to work around bugs in Linux and Windows:int err = env.getErrno();if (err == 111 /*ECONNREFUSED (Linux)*/ . . . . . .|| err == EAGAIN #endif|| err == 113 /*EHOSTUNREACH (Linux)*/) { // Why does Linux return this for datagram sock?fromAddress.sin_addr.s_addr = 0;return 0;}//##### END HACKsocketErr(env, "recvfrom() error: ");} else if (bytesRead == 0) {// "recvfrom()" on a stream socket can return 0 if the remote end has closed the connection. Treat this as an error:return -1;}return bytesRead; }這里通過 recvfrom() 函數從 socket 中讀取數據。
GenericMediaServer::ClientConnection 中,處理客戶端請求的這幾個函數聲明如下:
static void incomingRequestHandler(void*, int /*mask*/);void incomingRequestHandler();virtual void handleRequestBytes(int newBytesRead) = 0;handleRequestBytes() 為純虛函數,需要由子類實現。對于 RTSPServerSupportingHTTPStreaming::RTSPClientConnectionSupportingHTTPStreaming -> RTSPServer::RTSPClientConnection -> GenericMediaServer::ClientConnection 這個繼承層次體系,該函數的實現實際上位于 RTSPServer::RTSPClientConnection 中。
從客戶端接收到的請求,將由 RTSPServer::RTSPClientConnection::handleRequestBytes() 處理。
live555 源碼分析系列文章
live555 源碼分析:簡介
live555 源碼分析:基礎設施
live555 源碼分析:MediaSever
總結
以上是生活随笔為你收集整理的live555 源码分析:MediaSever的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: live555 源码分析:基础设施
- 下一篇: Wireshark 抓包分析 RTSP/