ActiveMQ源码解析 建立连接
作為一個消息中間件,有客戶端和服務(wù)端兩部分代碼,這次的源碼解析系列主要從客戶端的代碼入手,分成建立連接、消息發(fā)送、消息消費三個部分。趁著我昨天弄明白了源碼編譯的興奮勁頭還沒過去,今天研究一下建立連接的部分。
如果讀起來吃力,代碼部分可以略過,我把主要的功能點給加粗。
通常來說,客戶端使用MQ的API建立時,可以分成兩個步驟:
客戶端示例代碼:
可以看到主要的方法是ActiveMQConnectionFactory的構(gòu)造函數(shù),和createConnection(),以及connection中的start()方法。
ActiveMQConnectionFactory中的createConnection
構(gòu)造函數(shù)比較簡單,直接把傳入的用戶名密碼和url放在變量里
createConnection方法指向了createActiveMQConnection方法,該方法中主要做的事情有三個:
configureConnection(connection);這個方法的作用是對實例化出的ActiveMQConnetion對象中的參數(shù)的一系列配置,代碼有點長就不上了。
對于我們來說其實主要想看的是連接是如何建立起來的,也就是
createTransport();方法中包含了對客戶端傳入的url的初步校驗,主要是驗證URL的合法性,而后調(diào)用工廠類TransportFactory.connection(url)來進(jìn)行連接的建立。
我們客戶端在建立連接的時候,有可能有TCP、UDP等等協(xié)議,AMQ實現(xiàn)了簡單工廠類FactoryFinder,在TransportFactory.connection(url)方法中,先是通過FactoryFinder根據(jù)用戶輸入的url(比如tcp://192.168.0.1)來找到使用的協(xié)議工廠TcpTransportFactory,然后使用TcpTransportFactory中的類來進(jìn)行連接的建立。這個過程從代碼上來看有點曲折:
ObjectFactory的設(shè)計也是很有趣的。AMQ在代碼中的說法是之所以這么實現(xiàn)是因為這樣如果用戶想自己寫一個ObjectFactory,也可以支持。
/*** The strategy that the FactoryFinder uses to find load and instantiate Objects* can be changed out by calling the* {@link org.apache.activemq.util.FactoryFinder#setObjectFactory(org.apache.activemq.util.FactoryFinder.ObjectFactory)}* method with a custom implementation of ObjectFactory.** The default ObjectFactory is typically changed out when running in a specialized container* environment where service discovery needs to be done via the container system. For example,* in an OSGi scenario.*/public interface ObjectFactory {/*** @param path the full service path* @return*/public Object create(String path) throws IllegalAccessException, InstantiationException, IOException, ClassNotFoundException;}Anyway,我們現(xiàn)在通過這么曲折的過程得到了一個實例化的TcpTransportFactory對象,下一步應(yīng)該是調(diào)用doConnect(url)方法進(jìn)行連接的建立了。因為TcpTransportFactory繼承了TransportFactory類,doConnect方法仍然是在TransportFactory中的:
public Transport doConnect(URI location) throws Exception {try {//把url里關(guān)于Transport的配置提取出來,WireFormat基本都可以看成是url的配置。//如果使用Openwire(默認(rèn)協(xié)議),那么WireFormat就是openwire的相關(guān)配置。//見http://activemq.apache.org/configuring-wire-formats.htmlMap<String, String> options = new HashMap<String, String>(URISupport.parseParameters(location));if( !options.containsKey("wireFormat.host") ) {options.put("wireFormat.host", location.getHost());}WireFormat wf = createWireFormat(options);//建立socket連接Transport transport = createTransport(location, wf);//裝飾者模式,在連接上包裝上MutexTransportFilter、WireFormatNegotiator、InactivityMonitor、ResponseCorrelator四個功能Transport rc = configure(transport, wf, options);//remove autoIntrospectionSupport.extractProperties(options, "auto.");if (!options.isEmpty()) {throw new IllegalArgumentException("Invalid connect parameters: " + options);}return rc;} catch (URISyntaxException e) {throw IOExceptionSupport.create(e);}}這個方法中主要有三個重要功能:
其中配置WireFormat可以不看,建立TcpTransport其實是在調(diào)用createTransport(location, wf);時引用了TcpTransport的構(gòu)造函數(shù),代碼如下:
這里直接調(diào)用了socketFactory.createSocket();,使用的是默認(rèn)的方法,所以無法指定本地網(wǎng)卡建立連接。我看了下其實可以用socketFactory.createSocket(host, port, localHost, localPort)來改寫,改寫后就可以指定本地IP和端口了。
此外,查了下網(wǎng)絡(luò)上的資料,四大輔助后續(xù)再看:
MutexTransportFilter類實現(xiàn)了對每個請求的同步鎖,同一時間只允許發(fā)送一個請求,如果有第二個請求需要等待第一個請求發(fā)送完畢才可繼續(xù)發(fā)送。
WireFormatNegotiator類實現(xiàn)了在客戶端連接broker的時候先發(fā)送數(shù)據(jù)解析相關(guān)的協(xié)議信息,例如解析版本號,是否使用緩存等信息。
InactivityMonitor類實現(xiàn)了連接成功后啟動心跳檢查機制,客戶端每10秒發(fā)送一次心跳信息,服務(wù)端每30秒讀一次心跳信息,如果沒有讀到則會斷開連接,心跳檢測是相互的,客戶端也會每30秒讀取服務(wù)端發(fā)送來的心跳信息,如果沒有讀到也一樣會斷開連接。
ResponseCorrelator類實現(xiàn)了異步請求但需要獲取響應(yīng)信息否則就會阻塞等待功能。
ActiveMQConnection的Start()
在使用AMQ的過程中,很多用戶問我為什么Connection需要start(),不能在createConnection的時候直接start了嗎?而且不調(diào)用start方法其實不影響發(fā)送,但是會影響接收。抱著這樣的疑惑,我們來看一下源碼。
源碼里直接對start方法加了注釋,說明start就是啟動connection可以接收消息的功能。其實源碼里可以很明顯看出來start里包含了幾個步驟:
我好奇的是第二步,看看源碼
/*** Send the ConnectionInfo to the Broker** @throws JMSException*/protected void ensureConnectionInfoSent() throws JMSException {synchronized(this.ensureConnectionInfoSentMutex) {// Can we skip sending the ConnectionInfo packet??if (isConnectionInfoSentToBroker || closed.get()) {return;}//TODO shouldn't this check be on userSpecifiedClientID rather than the value of clientID?if (info.getClientId() == null || info.getClientId().trim().length() == 0) {info.setClientId(clientIdGenerator.generateId());}syncSendPacket(info.copy(), getConnectResponseTimeout());this.isConnectionInfoSentToBroker = true;// Add a temp destination advisory consumer so that// We know what the valid temporary destinations are on the// broker without having to do an RPC to the broker.ConsumerId consumerId = new ConsumerId(new SessionId(info.getConnectionId(), -1), consumerIdGenerator.getNextSequenceId());if (watchTopicAdvisories) {advisoryConsumer = new AdvisoryConsumer(this, consumerId);}}}從源碼里還能看到討論和待辦……我覺得我已經(jīng)深入核心了……這個方法里做了兩件事,
發(fā)送ConnectionInfo的數(shù)據(jù)包到服務(wù)端,如果info里沒有用戶自己設(shè)定的clientID,還會自動幫忙生成一個。發(fā)送的時候調(diào)用的是syncSendPacket方法,很明顯是個同步發(fā)送,需要服務(wù)端同步返回response才算發(fā)送成功,我理解這里應(yīng)該是一個試探連接的步驟。
建立一個通往臨時目的地的消費者。所以其實每一個ActiveMQConnection的連接中都自動包含了一個消費者。我臨時寫了個客戶端試了下,的確是存在的。
奇葩的是我就算不調(diào)用connection.start()方法,直接發(fā)送消息,這個臨時消費者也是存在的,所以肯定是在消息發(fā)送的時候的哪個地方調(diào)用了connection的start方法。
至于為什么不調(diào)用start()方法就無法消費,我看了下session的start方法:
/*** Start this Session.** @throws JMSException*/protected void start() throws JMSException {started.set(true);for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {ActiveMQMessageConsumer c = iter.next();c.start();}executor.start();}原來是在session的start方法里啟動了這個session里的consumer,想想session的建立過程的確是不需要調(diào)用session.start方法的,但是我們一般是先調(diào)用start方法,而后建立consumer,這個邏輯順序還是有點錯亂……
等下一次研究接收端的源碼時再深入吧。
本次發(fā)現(xiàn)的源碼優(yōu)化點
我真的去試了一下……在ConnectionInfo里添加了一條test=test,然后重新編譯服務(wù)端和客戶端的依賴jar包,開啟MQ的logging plugins,并且用客戶端去監(jiān)聽了一下ActiveMQ.Advisory.Connection,得到了這樣的結(jié)果。
總結(jié)
以上是生活随笔為你收集整理的ActiveMQ源码解析 建立连接的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: python爬虫高级项目管理师培训学校_
- 下一篇: 代码整洁之道-编写 Pythonic 代