[六]RabbitMQ-客户端源码之AMQCommand
歡迎支持筆者新作:《深入理解Kafka:核心設(shè)計(jì)與實(shí)踐原理》和《RabbitMQ實(shí)戰(zhàn)指南》,同時(shí)歡迎關(guān)注筆者的微信公眾號(hào):朱小廝的博客。
歡迎跳轉(zhuǎn)到本文的原文鏈接:https://honeypps.com/mq/rabbitmq-client-source-code-of-amqcommand/
AMQCommand是用來處理AMQ命令的,其包含了Method, Content Heaeder和Content Body.
下面是通過wireshark抓包的AMQP協(xié)議
上圖中的Basic.Publish命令就包含Method, Content header以及Content body。
AMQCommand不是直接包含Method等成員變量的,而是通過CommandAssembler又做了一次封裝。
接下來先看下CommandAssembler類。此類中有這些成員變量:
- CAState state標(biāo)識(shí)這此Command目前的狀態(tài),是準(zhǔn)備處理Method(EXPECTING_METHOD),還是處理Content header(EXPECTING_CONTENT_HEADER),還是準(zhǔn)備處理Content body(EXPECTING_CONTENT_BODY),還是以及完成了(COMPLETE)。
- Method method代表type=Method的AMQP幀
- AMQContentHeader contentHeader代表type=Content header的AMQP幀
- final List<byte[]> bodyN代表type=Content body的AMQP幀,就是真正的消息體(Message body)。
- bodyLength就是消息體大小
這個(gè)類中除了構(gòu)造函數(shù),getMethod, getContentHeader, getContentBody,isComplete這個(gè)幾個(gè)方法,最關(guān)鍵的方法就是:
public synchronized boolean handleFrame(Frame f) throws IOException {switch (this.state) {case EXPECTING_METHOD: consumeMethodFrame(f); break;case EXPECTING_CONTENT_HEADER: consumeHeaderFrame(f); break;case EXPECTING_CONTENT_BODY: consumeBodyFrame(f); break;default:throw new AssertionError("Bad Command State " + this.state);}return isComplete(); }這個(gè)方法主要是處理AQMP幀的,根據(jù)CAState state來處理相應(yīng)狀態(tài)類型的幀,然后賦值給相應(yīng)的成員變量。
采用consumeMethodFrame(Frame f)方法舉個(gè)例子:
這個(gè)方法首先判斷當(dāng)前幀是否是Method幀(AMQP.FRAME_METHOD),然后調(diào)用AMQPImp.readMethodFrom的方法。就那Connection.Start這個(gè)真來將,它會(huì)從socket的輸入流中讀取:
public Start(MethodArgumentReader rdr) throws IOException {this(rdr.readOctet(), rdr.readOctet(), rdr.readTable(), rdr.readLongstr(), rdr.readLongstr()); }對應(yīng)于下圖:
- 第一個(gè)rdr.readOctet()是指Version-Magor:0
- 第二個(gè)rdr.readOctet()是指Version-Minor:9
- 第三個(gè)rdr.readTable()是指Server-Properties
- 第四個(gè)rdr.readLongstr()是指Mechanisms
- 第五個(gè)rdr.readLongstr()是指Locales
而MethodArgumentReader.readOctet()就是:
public final int readOctet()throws IOException {clearBits();return in.readOctet();//in對象是DataInputStream對象 }寫到這里,思路再跳回來,知道了底層其實(shí)是Socket的DataInputStream,其上只是做了封裝再封裝
CommandAssembler 中的handleFrame這個(gè)方法只在AMQCommand中的:
只在這個(gè)方法中調(diào)用。CommandAssembler只是對Method,Content-Header,Content-Body做了一下封裝。下面繼續(xù)回到AMQCommand這個(gè)類中來。
仔細(xì)閱讀源碼的同學(xué)會(huì)發(fā)現(xiàn)在handleFrame方法當(dāng)遇到類似Basic.Publish時(shí)會(huì)有Method,Content-Header,Content-Body一起的報(bào)文,那么handleFrame處理完Method之后就直接返回了,沒有完全處理完,這該如何是好?
這個(gè)就又要聯(lián)系到AMQConnection中的MainLoop的內(nèi)部類了。此類中的關(guān)鍵代碼如下:
可以看到這是一個(gè)一直輪詢讀取Frame并處理Frame的過程。在遇到類似Basic.Publish這種帶Method, Content-Header, Content-Body的類型的報(bào)文時(shí),會(huì)循環(huán)處理,直到處理完成。注意這里的Method, Content-Header以及Content-Body都是看成單個(gè)Frame的,也就是這個(gè)while循環(huán)要三次,而不是將Basic.Publish看成一個(gè)幀。
上面調(diào)用的handleFrame方法是AMQChannel類中的(詳細(xì)可以參考([五]RabbitMQ-客戶端源碼之AMQChannel)):
可以看到只有當(dāng)AMQCommand的handleFrame方法返回true時(shí),即執(zhí)行完成之后才會(huì)繼續(xù)處理。
AMQCommand也有g(shù)etMethod, getContentHeader, getContentBody等方法,這些都是間接調(diào)用CommandAssembler類中相應(yīng)的方法的。
AMQCommand中也有個(gè)特別重要的方法:
這段主要通過傳輸AMQP幀的,通過AMQChannel獲取到通信鏈路connection,然后將AMQCommand對象自身的method成員變量(或者包括content-header以及content-body)傳送給broker。這段方法里還有判斷payload大小是否超過broker端所設(shè)置的最大幀大小frameMax,即(frameMax == 0) ? body.length : frameMax - EMPTY_FRAME_SIZE這段代碼。當(dāng)frameMax=0時(shí)則沒有大小限制,當(dāng)frameMax不為0時(shí)則按照payload拆分成若干的payload然后發(fā)送多個(gè)FRAME_BODY幀。
附:本系列全集
歡迎跳轉(zhuǎn)到本文的原文鏈接:https://honeypps.com/mq/rabbitmq-client-source-code-of-amqcommand/
歡迎支持筆者新作:《深入理解Kafka:核心設(shè)計(jì)與實(shí)踐原理》和《RabbitMQ實(shí)戰(zhàn)指南》,同時(shí)歡迎關(guān)注筆者的微信公眾號(hào):朱小廝的博客。
總結(jié)
以上是生活随笔為你收集整理的[六]RabbitMQ-客户端源码之AMQCommand的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: [五]RabbitMQ-客户端源码之AM
- 下一篇: [七]RabbitMQ-客户端源码之AM