[Conclusion]RabbitMQ-客户端源码之总结
歡迎支持筆者新作:《深入理解Kafka:核心設(shè)計與實踐原理》和《RabbitMQ實戰(zhàn)指南》,同時歡迎關(guān)注筆者的微信公眾號:朱小廝的博客。
歡迎跳轉(zhuǎn)到本文的原文鏈接:https://honeypps.com/mq/rabbitmq-client-source-code-of-conclusion/
RabbitMQ遵從的是AMQP協(xié)議,其broker端代碼采用erlang編寫,對于沒有接觸過erlang的同學(xué)(包括博主我)來說,想要了解其中的奧秘實在是不容易,大多只能從網(wǎng)上“搜刮”點散碎的知識點來充實一下。但是這樣是不能究其然,更不能究其所以然。博主這里翻閱了amqp-client的java客戶端的源碼,通過其來學(xué)習下AMQP協(xié)議,進而更深刻的了解RabbitMQ.
注:如無特殊說明,本系列的文章采用的amqp-client版本均為3.5.3。
本系列的文章主要是來闡述客戶端與broker交互需要經(jīng)歷那些具體步驟,需要涉及那些重要的類以及方法,整體的輪廓又是如何。
本文主要涉及的類有(本系列的blog地址):
[一]RabbitMQ-客戶端源碼之ConnectionFactory
[二]RabbitMQ-客戶端源碼之AMQConnection
[三]RabbitMQ-客戶端源碼之ChannelManager
[四]RabbitMQ-客戶端源碼之Frame
[五]RabbitMQ-客戶端源碼之AMQChannel
[六]RabbitMQ-客戶端源碼之AMQCommand
[七]RabbitMQ-客戶端源碼之AMQPImpl+Method
[八]RabbitMQ-客戶端源碼之ChannelN
[九]RabbitMQ-客戶端源碼之Consumer
以發(fā)送消息來看看從源碼級的邏輯流轉(zhuǎn)情況。
首先看看發(fā)送消息的業(yè)務(wù)代碼(部分主要的代碼):
ConnectionFactory factory = new ConnectionFactory(); factory.setHost(ip); factory.setPort(5672); factory.setUsername("root"); factory.setPassword("root"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); String message = "RabbitMQ Demo Test:" + System.currentTimeMillis(); channel.basicPublish(EXCHANGE_NAME, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes()); channel.close(); connection.close();從上面來看,主要牽涉ConnectionFactory, Connection, Channel這個幾個類(有關(guān)Connection和Channel的AMQP流轉(zhuǎn)流程可以參考文中最后部分)。實際情況是怎么樣的呢,我們來分析下。
首先流轉(zhuǎn)過程如下:
ConnectionFactory.newConnection()-- AMQConnection.start()-- MainLoop-- Frame frame = SocketFrameHandler.readFrame()-- AMQChannel.handleFrame(Frame frame)- ConnectionFactory主要用來配置一些參數(shù),并初始化AMQConnection, 這個版本的客戶端與broker底層通信用的是java的原生Socket, 處理模塊為SocketFrameHandler,SocketFrameHandler也在ConnectionFactory調(diào)用newConnection()時創(chuàng)建。之后根據(jù)參數(shù)以及SocketFrameHandler初始化了AMQConnection對象。
- AMQConnection的核心在于這個start()方法。包括Protocol-Header, Connection.Start/.StartOk, Connection.Tune/.TuneOk, Connection.Open/.OpenOk,以及啟動MainLoop線程
- MainLoop是AMQConnection的內(nèi)部私有類,主要用來(循環(huán))讀取(SocketFrameHandler.readFrame)并封裝Socket中的幀F(xiàn)rame, 并進行進一步的處理,這個由AMQChannel來完成
- ChannelManager,這個在上面并沒有展示出來,但是這里也需要說明下,這個是在MainLoop中處理Frame所使用的,用來管理Channel的,確切的來說是ChannelN.
這個接著上面的流程繼續(xù):
AMQChannel.handleFrame(Frame frame)首先調(diào)用AMQCommand的handleFrame(Frame frame)方法來處理,AMQCommand內(nèi)部其實是調(diào)用了CommandAssember(對Method, Content-Header以及Content-Body做了一下封裝,其實忽略這個類也是可以的)的handleFrame, 說白了作用就是處理下Frame幀,方法返回值是boolean, 當一個AMQComand處理完畢后返回true,否則返回false。這里就有疑問了,什么叫做處理完畢?這里就又要說到MainLoop了,MainLoop線程主要循環(huán)讀取Frame幀,像Connection.Start/.StartOk這種命令(AMQCommand)一般只包括Method類型的幀(Frame),AMQCommand的handleFrame方法直接返回true,但是像Basic.Publish這種命令一般包括Method幀,Content-Header幀,以及若干Content-Body幀,就需要handleFrame多次才能返回true。
AMQP技術(shù)術(shù)語
Method: 用于在節(jié)點之間傳遞特定類型的AMQP命令幀。
Content: 服務(wù)器和應(yīng)用程序之間傳送的數(shù)據(jù).這個術(shù)語是“message”的同義詞。
Content header:描述內(nèi)容屬性特定類型幀。
Content body: 包含原始應(yīng)用程序數(shù)據(jù)的特定類型幀.內(nèi)容體幀完全不透明-服務(wù)器不以任何方式檢查或修改其body內(nèi)容。
上面這個是AMQChannel中的handleFrame方法,當內(nèi)部的AMQCommand的handleFrame方法返回true,即表示處理完畢一條AMQCommand之后再調(diào)用handleCompleteInboundCommand方法進行進一步處理。而這個handleCompleteInboundCommand方法的精髓在于processAsync方法,這個processAsync方法在AMQChannel中是一個抽象方法,真正的實現(xiàn)要看ChannelN這個類。
說到這里有一個點我沒有提及,但是這個不影響主流程的闡述,這個點就是rpc的概念,具體的可以詳細參考對AMQChannel類的介紹——[五]RabbitMQ-客戶端源碼之AMQChannel。
文中開篇的demo示例,采用wireshark工具抓包可得:
這里用來參考,以便更好的闡述Connection類和Channel類。
Connection類
AMQP是一個連接協(xié)議. 連接設(shè)計為長期的,且可運載多個通道. 連接生命周期是這樣的:
- client打開與服務(wù)器的TCP/IP連接并發(fā)送一個協(xié)議頭(protocol header).這只是client發(fā)送的數(shù)據(jù),而不是作為方法格式的數(shù)據(jù).
- server使用其協(xié)議版本和其它屬性,包括它支持安全機制列表(Start方法)進行響應(yīng).
- client選擇一種安全機制(Start-Ok).
- server開始認證過程, 它使用SASL的質(zhì)詢-響應(yīng)模型(challenge-response model). 它向客戶端發(fā)送一個質(zhì)詢(Secure).
- client向server發(fā)送一個認證響應(yīng)(Secure-Ok). 例如,對于使用"plain"機制,響應(yīng)會包含登錄用戶名和密碼.
(server 重復(fù)質(zhì)詢(Secure) 或轉(zhuǎn)到協(xié)商,發(fā)送一系列參數(shù),如最大幀大小(Tune).) - client接受或降低這些參數(shù)(Tune-Ok).
- client 正式打開連接并選擇一個虛擬主機(Open).
- 服務(wù)器確認虛擬主機是一個有效的選擇 (Open-Ok).
- 客戶端現(xiàn)在使用希望的連接.
- 一個節(jié)點(client 或 server) 結(jié)束連接(Close).
- 另一個節(jié)點對連接結(jié)束握手(Close-Ok).
- server 和 client關(guān)閉它們的套接字連接.
沒有為不完全打開的連接上的錯誤進行握手. 根據(jù)成功協(xié)議頭協(xié)商(后面有詳細定義),在發(fā)送或收到Open 或Open-Ok之前,如果一個節(jié)點檢測到錯誤,這個節(jié)點必須關(guān)閉socket,而不需要發(fā)送任何進一步的數(shù)據(jù)。
Channel類
AMQP是一個多通道協(xié)議. 通道提供了一種方式來將一個重量級TCP/IP連接分成多個輕量級連接。這使得協(xié)議對于防火墻更加友好,因為端口使用是可預(yù)測的. 這也意味著傳輸調(diào)整和網(wǎng)絡(luò)服務(wù)質(zhì)量可以得到更好的利用。通道是獨立的,它們可以同時執(zhí)行不同的功能,可用帶寬會在當前活動之間共享。這是令人期待的,我們鼓勵多線程客戶端應(yīng)用程序經(jīng)常使用"每個通道一個線程"編程模型.。然而,從單個client打開一個或多個AMQP servers連接也是完全可以接受的.。
通道生命周期如下:
- client打開一個新通道(Open).
- server確認新通道準備就緒(Open-Ok).
- client和server按預(yù)期來使用通道.
- 一個節(jié)點(client或server) 關(guān)閉了通道(Close).
- 另一個節(jié)點對通道關(guān)閉進行握手(Close-Ok).
附:本系列全集
歡迎跳轉(zhuǎn)到本文的原文鏈接:https://honeypps.com/mq/rabbitmq-client-source-code-of-conclusion/
歡迎支持筆者新作:《深入理解Kafka:核心設(shè)計與實踐原理》和《RabbitMQ實戰(zhàn)指南》,同時歡迎關(guān)注筆者的微信公眾號:朱小廝的博客。
總結(jié)
以上是生活随笔為你收集整理的[Conclusion]RabbitMQ-客户端源码之总结的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: [九]RabbitMQ-客户端源码之Co
- 下一篇: RabbitMQ之镜像队列