ZLMediaKit接收ffmpeg rtmp推流
目錄
一 關鍵類
二 推流緩沖
webrtc拉流篇,可參考
https://mp.csdn.net/mp_blog/creation/editor/122743325
RTMP采用的封裝格式是FLV。所以在指定輸出流媒體的時候須要指定其封裝格式為“flv”。同理,其余流媒體協(xié)議也須要指定其封裝格式。例如采用UDP推送流媒體的時候,能夠指定其封裝格式為“mpegts”。
一 關鍵類
| 環(huán)形緩沖,聚合了_RingStorage template<typename T> class RingBuffer : public enable_shared_from_this<RingBuffer<T> > { public: ? ? typedef std::shared_ptr<RingBuffer> Ptr; ? ? typedef _RingReader<T> RingReader; ? ? typedef _RingStorage<T> RingStorage; ? ? typedef _RingReaderDispatcher<T> RingReaderDispatcher; ? ? typedef function<void(int size)> onReaderChanged; ? ? RingBuffer(int max_size = 1024, const onReaderChanged &cb = nullptr) { ? ? ? ? _on_reader_changed = cb; ? ? ? ? _storage = std::make_shared<RingStorage>(max_size); ? ? } ? ? ~RingBuffer() {} ? ? void write(T in, bool is_key = true) { ? ? ? ? if (_delegate) { ? ? ? ? ? ? _delegate->onWrite(std::move(in), is_key); ? ? ? ? ? ? return; ? ? ? ? } ? ? ? ? LOCK_GUARD(_mtx_map); ? ? ? ? for (auto &pr : _dispatcher_map) { ? ? ? ? ? ? auto &second = pr.second; ? ? ? ? ? ? //切換線程后觸發(fā)onRead事件 ? ? ? ? ? ? pr.first->async([second, in, is_key]() { ? ? ? ? ? ? ? ? second->write(std::move(const_cast<T &>(in)), is_key); ? ? ? ? ? ? }, false); ? ? ? ? } ? ? ? ? _storage->write(std::move(in), is_key); ? ? } ? ? int readerCount() { ? ? ? ? return _total_count; ? ? } ? ? void clearCache(){ ? ? ? ? LOCK_GUARD(_mtx_map); ? ? ? ? _storage->clearCache(); ? ? ? ? for (auto &pr : _dispatcher_map) { ? ? ? ? ? ? auto &second = pr.second; ? ? ? ? ? ? //切換線程后清空緩存 ? ? ? ? ? ? pr.first->async([second]() { ? ? ? ? ? ? ? ? second->clearCache(); ? ? ? ? ? ? }, false); ? ? ? ? } ? ? } private: ? ? struct HashOfPtr { ? ? ? ? std::size_t operator()(const EventPoller::Ptr &key) const { ? ? ? ? ? ? return (std::size_t) key.get(); ? ? ? ? } ? ? }; private: ? ? mutex _mtx_map; ? ? atomic_int _total_count {0}; ? ? typename RingStorage::Ptr _storage; ? ? typename RingDelegate<T>::Ptr _delegate; ? ? onReaderChanged _on_reader_changed; ? ? unordered_map<EventPoller::Ptr, typename RingReaderDispatcher::Ptr, HashOfPtr> _dispatcher_map; }; |
| 存放ffmpeg rtmp推流,demuxer,解碼,重新編碼后的rtppacket數(shù)據(jù) template<typename T> class _RingStorage { public: ? ? typedef std::shared_ptr<_RingStorage> Ptr; ? ? _RingStorage(int max_size) { ? ? ? ? //gop緩存?zhèn)€數(shù)不能小于32 ? ? ? ? if(max_size < RING_MIN_SIZE){ ? ? ? ? ? ? max_size = RING_MIN_SIZE; ? ? ? ? } ? ? ? ? _max_size = max_size; ? ? } ? ? ~_RingStorage() {} ? ? /** ? ? ?* 寫入環(huán)形緩存數(shù)據(jù) ? ? ?* @param in 數(shù)據(jù) ? ? ?* @param is_key 是否為關鍵幀 ? ? ?* @return 是否觸發(fā)重置環(huán)形緩存大小 ? ? ?*/ ? ? ?void write(T in, bool is_key = true) { ? ? ? ? if (is_key) { ? ? ? ? ? ? //遇到I幀,那么移除老數(shù)據(jù) ? ? ? ? ? ? _size = 0; ? ? ? ? ? ? _have_idr = true; ? ? ? ? ? ? _data_cache.clear(); ? ? ? ? } ? ? ? ? if (!_have_idr) { ? ? ? ? ? ? //緩存中沒有關鍵幀,那么gop緩存無效 ? ? ? ? ? ? return; ? ? ? ? } ? ? ? ? _data_cache.emplace_back(std::make_pair(is_key, std::move(in))); ? ? ? ? if (++_size > _max_size) { ? ? ? ? ? ? //GOP緩存溢出,清空關老數(shù)據(jù) ? ? ? ? ? ? _size = 0; ? ? ? ? ? ? _have_idr = false; ? ? ? ? ? ? _data_cache.clear(); ? ? ? ? } ? ? } ? ? Ptr clone() const { ? ? ? ? Ptr ret(new _RingStorage()); ? ? ? ? ret->_size = _size; ? ? ? ? ret->_have_idr = _have_idr; ? ? ? ? ret->_max_size = _max_size; ? ? ? ? ret->_data_cache = _data_cache; ? ? ? ? return ret; ? ? } ? ? const List<pair<bool, T> > &getCache() const { ? ? ? ? return _data_cache; ? ? } ? ? void clearCache(){ ? ? ? ? _size = 0; ? ? ? ? _data_cache.clear(); ? ? } private: ? ? _RingStorage() = default; private: ? ? bool _have_idr = false; ? ? int _size = 0; ? ? int _max_size; ? ? List<pair<bool, T> > _data_cache; }; |
| webrtc從此處reader template<typename T> class _RingReader { public: ? ? typedef std::shared_ptr<_RingReader> Ptr; ? ? friend class _RingReaderDispatcher<T>; ? ? _RingReader(const std::shared_ptr<_RingStorage<T> > &storage, bool use_cache) { ? ? ? ? _storage = storage; ? ? ? ? _use_cache = use_cache; ? ? } ? ? ~_RingReader() {} ? ? void setReadCB(const function<void(const T &)> &cb) { ? ? ? ? if (!cb) { ? ? ? ? ? ? _read_cb = [](const T &) {}; ? ? ? ? } else { ? ? ? ? ? ? _read_cb = cb; ? ? ? ? ? ? flushGop(); ? ? ? ? } ? ? } ? ? void setDetachCB(const function<void()> &cb) { ? ? ? ? if (!cb) { ? ? ? ? ? ? _detach_cb = []() {}; ? ? ? ? } else { ? ? ? ? ? ? _detach_cb = cb; ? ? ? ? } ? ? } private: ? ? void onRead(const T &data, bool is_key) { ? ? ? ? _read_cb(data); ? ? } ? ? void onDetach() const { ? ? ? ? _detach_cb(); ? ? } ? ? void flushGop() { ? ? ? ? if (!_use_cache) { ? ? ? ? ? ? return; ? ? ? ? } ? ? ? ? _storage->getCache().for_each([&](const pair<bool, T> &pr) { ? ? ? ? ? ? onRead(pr.second, pr.first); ? ? ? ? }); ? ? } private: ? ? bool _use_cache; ? ? shared_ptr<_RingStorage<T> > _storage; ? ? function<void(void)> _detach_cb = []() {}; ? ? function<void(const T &)> _read_cb = [](const T &) {}; }; |
二 推流緩沖
void EventPoller::runLoop(bool blocked,bool regist_self)
bool Socket::attachEvent(const SockFD::Ptr &sock, bool is_udp) /lambda表達式
ssize_t Socket::onRead(const SockFD::Ptr &sock, bool is_udp) noexcept
void TcpServer::onAcceptConnection(const Socket::Ptr &sock) /lambda表達式
void RtmpSession::onRecv(const Buffer::Ptr &buf)
void RtmpProtocol::onParseRtmp(const char *data, size_t size)
void HttpRequestSplitter::input(const char *data,size_t len)
const char *RtmpProtocol::onSearchPacketTail(const char *data,size_t len)
const char* RtmpProtocol::handle_C2(const char *data, size_t len)
const char* RtmpProtocol::handle_rtmp(const char *data, size_t len)
void RtmpProtocol::handle_chunk(RtmpPacket::Ptr packet)
void RtmpSession::onRtmpChunk(RtmpPacket::Ptr packet)?
void onWrite(RtmpPacket::Ptr pkt, bool = true) override
void RtmpDemuxer::inputRtmp(const RtmpPacket::Ptr &pkt)?
void H264RtmpDecoder::inputRtmp(const RtmpPacket::Ptr &pkt)?
inline void H264RtmpDecoder::onGetH264(const char* data, size_t len, uint32_t dts, uint32_t pts)?
bool FrameDispatcher ::?inputFrame(const Frame::Ptr &frame) override
bool H264Track::inputFrame(const Frame::Ptr &frame)?
bool H264Track::inputFrame_l(const Frame::Ptr &frame)
?bool FrameWriterInterfaceHelper::inputFrame(const Frame::Ptr &frame) override
bool MediaSink::addTrack(const Track::Ptr &track_in)/lambda表達式
bool MultiMediaSourceMuxer::onTrackFrame(const Frame::Ptr &frame_in)?
bool RtspMediaSourceMuxer::inputFrame(const Frame::Ptr &frame) override
bool RtspMuxer::inputFrame(const Frame::Ptr &frame)?
bool H264RtpEncoder::inputFrame(const Frame::Ptr &frame)?
bool H264RtpEncoder::inputFrame_l(const Frame::Ptr &frame, bool is_mark)
void H264RtpEncoder::packRtp(const char *ptr, size_t len, uint32_t pts, bool is_mark, bool gop_pos)
void H264RtpEncoder::packRtpFu(const char *ptr, size_t len, uint32_t pts, bool is_mark, bool gop_pos)
bool RtpRing::inputRtp(const RtpPacket::Ptr &rtp, bool key_pos)
void RingBuffer::write(T in, bool is_key = true)
ringbuffer超過maxsize 512之后,推流數(shù)據(jù)未被拉流的話,會清空。推流和拉流過程是獨立進行,比如只ffmepg推流,webrtc不拉流,播放放。
總結(jié)
以上是生活随笔為你收集整理的ZLMediaKit接收ffmpeg rtmp推流的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: ssi 指令 php,SSI 漏洞学习笔
- 下一篇: 天龙架设linux环境配置,《果子资源》