[一]RabbitMQ-客户端源码之ConnectionFactory
歡迎支持筆者新作:《深入理解Kafka:核心設(shè)計(jì)與實(shí)踐原理》和《RabbitMQ實(shí)戰(zhàn)指南》,同時(shí)歡迎關(guān)注筆者的微信公眾號(hào):朱小廝的博客。
歡迎跳轉(zhuǎn)到本文的原文鏈接:https://honeypps.com/mq/rabbitmq-client-source-code-of-connectionfactory/
首先看一段amqp-client發(fā)送端的示例代碼(展示出主要部分):
ConnectionFactory factory = new ConnectionFactory(); factory.setHost(ip); factory.setPort(5672); factory.setUsername("root"); factory.setPassword("root"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); String message = "RabbitMQ Demo Test:" + System.currentTimeMillis(); channel.basicPublish(EXCHANGE_NAME, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes()); channel.close(); connection.close();相信使用rabbitmq java客戶(hù)端的同學(xué)來(lái)說(shuō),這段代碼并不陌生,主要的作用是發(fā)送一條消息至broker然后關(guān)閉。通過(guò)wireshark抓包工具可以看到整個(gè)AMQP協(xié)議的流程,如下圖:
(xx.xx.48.240是client的ip,xx.xx.197.73是broker的ip)
下面通過(guò)源碼來(lái)分析下Connection有關(guān)的整個(gè)流程,對(duì)于上面AMQP流程中的Protocol-Header到Connection.Open-Ok的部分。
首先是ConnectionFactory類(lèi)(文章開(kāi)篇的demo中),這里主要包含一些與broker連接的配置參數(shù)等,比如:username, password, virtualHost, host,port, requestedChannelMax, requestedFrameMax, requestedHeartbeat, connectionTimeout, shutdownTimeout(只列出部分)。
這個(gè)類(lèi)中其余都是些Getter和Setter方法,但是有個(gè)newConnection方法是關(guān)鍵,文中開(kāi)篇的demo代碼下面列出詳細(xì)內(nèi)容:
/*** Create a new broker connection, picking the first available address from* the list.** If <a href="http://www.rabbitmq.com/api-guide.html#recovery">automatic connection recovery</a>* is enabled, the connection returned by this method will be {@link Recoverable}. Future* reconnection attempts will pick a random accessible address from the provided list.** @param executor thread execution service for consumers on the connection* @param addrs an array of known broker addresses (hostname/port pairs) to try in order* @return an interface to the connection* @throws java.io.IOException if it encounters a problem* @see <a href="http://www.rabbitmq.com/api-guide.html#recovery">Automatic Recovery</a>*/ public Connection newConnection(ExecutorService executor, Address[] addrs)throws IOException, TimeoutException {FrameHandlerFactory fhFactory = createFrameHandlerFactory();ConnectionParams params = params(executor);if (isAutomaticRecoveryEnabled()) {// see com.rabbitmq.client.impl.recovery.RecoveryAwareAMQConnectionFactory#newConnectionAutorecoveringConnection conn = new AutorecoveringConnection(params, fhFactory, addrs);conn.init();return conn;} else {IOException lastException = null;for (Address addr : addrs) {try {FrameHandler handler = fhFactory.create(addr);AMQConnection conn = new AMQConnection(params, handler);conn.start();return conn;} catch (IOException e) {lastException = e;}}throw (lastException != null) ? lastException : new IOException("failed to connect");} }方法中首先是FrameHandlerFactory fhFactory = createFrameHandlerFactory();這個(gè)是用來(lái)處理client與broker之間的通信幀(Frame)的,包括建立通信鏈路(java的原生socket,注意這里沒(méi)有NIO也沒(méi)有netty)。
protected FrameHandlerFactory createFrameHandlerFactory() throws IOException {return new FrameHandlerFactory(connectionTimeout, factory, socketConf, isSSL()); }調(diào)用createFrameHandlerFactory()方法得到FrameHandlerFactory對(duì)象之后再:“ FrameHandler handler = fhFactory.create(addr);”返回的是SocketFrameHandler對(duì)象,這個(gè)對(duì)象是對(duì)Socket的一個(gè)封裝,完全可以看成是一個(gè)Socket對(duì)象。
注意這里的Socket的TCP_NODELAY參數(shù)默認(rèn)設(shè)置為true,而不是默認(rèn)的false。當(dāng)然你也可以調(diào)用ConnectionFactory的setSocketConfigurator自行設(shè)置。
有關(guān)Socket的TCP_NODELAY參數(shù):默認(rèn)情況下發(fā)送數(shù)據(jù)是采用Negale算法。Negale算法是指發(fā)送方數(shù)據(jù)不會(huì)立刻發(fā)送出去,而是先放在緩沖區(qū)內(nèi),等待緩沖區(qū)滿(mǎn)了,在發(fā)出去。Negale算法適用于需要發(fā)送大量數(shù)據(jù)的應(yīng)用場(chǎng)景。這種算法減少傳輸?shù)拇螖?shù)增加性能。但是如果對(duì)于需要即使響應(yīng)的,小批量數(shù)據(jù)的應(yīng)用場(chǎng)景,例如網(wǎng)絡(luò)游戲就不能采用Negale算法了。默認(rèn)是false,表示采用Negale算法。
ConnectionParams 主要用來(lái)配置與broker連接相關(guān)的參數(shù),比如username,password,vhost等。這個(gè)與前面Socket的參數(shù)不同,需要注意區(qū)分。
之后if(isAutomaticRecoveryEnabled()){}之內(nèi)的方法是建立可自動(dòng)恢復(fù)連接的,這個(gè)可以忽略,直接看else里面的代碼,因?yàn)閕f和else大體功能上一致,else里的更通用一些,也是默認(rèn)的。上面提到 FrameHandler handler = fhFactory.create(addr);這段代碼返回的是SocketFrameHandler對(duì)象,之后: AMQConnection conn = new AMQConnection(params, handler);這句通過(guò)參數(shù)和Socket與broker建立連接。之后初始化:conn.start();完成之后客戶(hù)端就已經(jīng)和broker建立了正常的連接了.
有關(guān)AMQConnection的詳細(xì)內(nèi)容將在下一篇文章[二]RabbitMQ-客戶(hù)端源碼之AMQConnection中講述。
附:本系列全集
歡迎跳轉(zhuǎn)到本文的原文鏈接:https://honeypps.com/mq/rabbitmq-client-source-code-of-connectionfactory/
歡迎支持筆者新作:《深入理解Kafka:核心設(shè)計(jì)與實(shí)踐原理》和《RabbitMQ實(shí)戰(zhàn)指南》,同時(shí)歡迎關(guān)注筆者的微信公眾號(hào):朱小廝的博客。
總結(jié)
以上是生活随笔為你收集整理的[一]RabbitMQ-客户端源码之ConnectionFactory的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: RabbitMQ之Consumer消费模
- 下一篇: [三]RabbitMQ-客户端源码之Ch