nats streaming订阅
我這里分析的是由nats streaming啟動nats的,不連接外部nats
1.在nats streaming第一章,已經說明nats streaming會啟動客戶端連接nats,然后注冊消息回調,包括客戶端連接,訂閱,發布等消息回調
2.訂閱客戶端發送訂閱消息,服務端調用readLoop()進行消息讀取,c.parse消息解析,->(c *client) processSub->(c *client) processMsg
for循環調用訂閱事件監聽者,
3.上面都還算是nats服務器調用,接著消息轉發到nats streaming,readLoop->(nc *Conn) processMsgArgs(在這里確定事件的調用者,這里認為有個設計不好的地方,事件調用者確認是通過nc.ps.ma.sid,而這個值是按依賴于初始化自增的,個人認為固定事件id更好,而不是依賴于初始化順序,在這里訂閱事件的sid是4,)->(nc *Conn) processMsg(把消息丟入sub.pHead隊列)->
(nc *Conn) waitForMsgs(循環取出隊列消息進行處理,調用mcb(m)進行事件注冊回調)->(s *StanServer) processSubscriptionRequest(處理客戶端訂閱消息s.processSub(保存到持久化存儲),訂閱回復s.ncs.Publish)
根據object名稱建立對應通道,
?
然后把對應訂閱對象存入通道對象(channel.ss.psubs),這里區分是永久訂閱還是臨時訂閱,
以上就是大體的nats streaming訂閱流程。
?
總結
以上是生活随笔為你收集整理的nats streaming订阅的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: redis和zk实现分布式锁的优缺点
- 下一篇: 基因编辑婴儿的意味着什么