[八]RabbitMQ-客户端源码之ChannelN
歡迎支持筆者新作:《深入理解Kafka:核心設計與實踐原理》和《RabbitMQ實戰指南》,同時歡迎關注筆者的微信公眾號:朱小廝的博客。
歡迎跳轉到本文的原文鏈接:https://honeypps.com/mq/rabbitmq-client-source-code-of-channeln/
ChannelN是整個RabbitMQ客戶端最核心的一個類了,其包含的功能點甚多,這里需要分類闡述。
首先來看看ChannelN的成員變量:
源代碼中有關ChannelN的呈現順序有所不同,這里博主為了區分開來,重新排了序。
processAsync(Command command)
在AMQChannel這個抽象類中唯一的抽象方法即為此方法,這個方法主要用來針對接受到broker的AMQCommand進行進一步的處理,至于怎么接受Socket,怎么封裝成幀,怎么確定一個AMQComand已經封裝完畢,都已在調用此方法前完成。此方法可以處理:Channel.Close, Basic.Deliver, Basic.Return, Channel.Flow, Basic.Ack, Basic.Nack, Basic.RecoverOk, Basic.Cancel, Channel.CloseOk等這些從broker端回傳的AMQComand.
這個方法也比較長,下面也會涉及到這個方法內的內容。
Confirm.Select & Basic.Publish
在RabbitMQ之消息確認機制(事務+Confirm)這篇文章中,博主就講到RabbitMQ的producer端確認機制分為事務機制和Confirm機制,這里就來闡述下Confirm機制的內部實現。
和Confirm機制有關的成員變量有:
在使用Confirm機制的時候,首先要置Channel為Confirm模式,即向broker端發送Confirm.Select。
業務代碼(DEMO實例):
在創建完Channel之后調用channel.confirmSelect()方法即可,confirmSelect()代碼如下:
public Confirm.SelectOk confirmSelect()throws IOException {if (nextPublishSeqNo == 0) nextPublishSeqNo = 1;return (Confirm.SelectOk)exnWrappingRpc(new Confirm.Select(false)).getMethod(); }這里的成員變量nextPublishSeqNo是用來為Confirm機制服務的,當Channel開啟Confirm模式的時候,nextPublishSeqNo=1,標記第一條publish的序號,當Publish時:
public void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body) throws IOException {if (nextPublishSeqNo > 0) {unconfirmedSet.add(getNextPublishSeqNo());nextPublishSeqNo++;}BasicProperties useProps = props;if (props == null) {useProps = MessageProperties.MINIMAL_BASIC;}transmit(new AMQCommand(new Basic.Publish.Builder().exchange(exchange).routingKey(routingKey).mandatory(mandatory).immediate(immediate).build(),useProps, body)); }client端向broker端Basic.Pubish發送消息并將當前的序號加入到unconfirmedSet中,并自加nextPublishSeqNo++等待下一個消息的發送。
有關Confirm.Select的詳細用法可以參考:RabbitMQ之消息確認機制(事務+Confirm)
之后等待broker的確認回復(Basic.Ack/.Nack):channel.waitForConfirms()
public boolean waitForConfirms(long timeout)throws InterruptedException, TimeoutException {if (nextPublishSeqNo == 0L)throw new IllegalStateException("Confirms not selected");long startTime = System.currentTimeMillis();synchronized (unconfirmedSet) {while (true) {if (getCloseReason() != null) {throw Utility.fixStackTrace(getCloseReason());}if (unconfirmedSet.isEmpty()) {boolean aux = onlyAcksReceived;onlyAcksReceived = true;return aux;}if (timeout == 0L) {unconfirmedSet.wait();} else {long elapsed = System.currentTimeMillis() - startTime;if (timeout > elapsed) {unconfirmedSet.wait(timeout - elapsed);} else {throw new TimeoutException();}}}} }可以看到waitForConfirms其實本質上是在等待unconfirmedSet變成empty,否則就線程wait()。
當接收到broker端的ACK/NACK回復時,一步步的經過處理到達processAsync(Command command)方法,然后進而處理Basic.Ack/.Nack幀。
else if (method instanceof Basic.Ack) {Basic.Ack ack = (Basic.Ack) method;callConfirmListeners(command, ack);handleAckNack(ack.getDeliveryTag(), ack.getMultiple(), false);return true; } else if (method instanceof Basic.Nack) {Basic.Nack nack = (Basic.Nack) method;callConfirmListeners(command, nack);handleAckNack(nack.getDeliveryTag(), nack.getMultiple(), true);return true; }首先是將相應的Method做一下轉換,之后callConfirmListeners(),這個方法是調用成員變量confirmListeners這個list里的所有的ConfirmListener:
private final Collection<ConfirmListener> confirmListeners = new CopyOnWriteArrayList<ConfirmListener>();這個ConfirmListener的list就需要在channel.basicPushlish()調用之前先:
channel.confirmSelect(); channel.addConfirmListener(new ConfirmListener() {public void handleAck(long deliveryTag, boolean multiple) throws IOException {//TODO}public void handleNack(long deliveryTag, boolean multiple) throws IOException {//TODO} });在調用完ConfirmListener之后繼續調用handleAckNack方法:
private void handleAckNack(long seqNo, boolean multiple, boolean nack) {if (multiple) {unconfirmedSet.headSet(seqNo + 1).clear();} else {unconfirmedSet.remove(seqNo);}synchronized (unconfirmedSet) {onlyAcksReceived = onlyAcksReceived && !nack;if (unconfirmedSet.isEmpty())unconfirmedSet.notifyAll();} }這個方法本意上是對收到某條消息的ACK或者NACK的處理,發送消息時Basic.Publish的nextPublishNo對應于相應的ACK/NACK的deliveryTag,將其從unconfirmedSet中刪除即可,如果有NACK幀,則將其相應的標識onlyAcksReceived設置為false,判斷此時unconfirmedSet是否為空,如果條件成立則notifyAll(),將waitForConfirm喚起,返回onlyAcksReceived的狀態。
如果channel.waitForConfirm()返回為false,則說明broker沒有接受client發送的消息,此時需要在業務代碼中做進一步處理,比如重發。
Basic.Qos
消費者在開啟ACK的情況下,對接受到的消息可以根據業務的需要異步對消息進行確認。
然而在實際使用過程中,由于消費者自身處理能力有限,從RabbitMQ獲取一定數量的消息好厚,希望rabbitmq不再將隊列中的消息推送過來,當對消息處理完后(即對消息進行了ack,并且有能力處理更多的消息)再接受來自隊列的消息。在這種場景下,我們可以設置Basic.Qos中的prefetch_count來達到這個效果。
Basic.Consume
與消費有關的成員變量:
private final Map<String, Consumer> _consumers =Collections.synchronizedMap(new HashMap<String, Consumer>()); private volatile Consumer defaultConsumer = null; private final ConsumerDispatcher dispatcher;源碼如下:
/*** Start a consumer. Calls the consumer's {@link Consumer#handleConsumeOk}* method.* @param queue the name of the queue* @param autoAck true if the server should consider messages* acknowledged once delivered; false if the server should expect* explicit acknowledgements* @param consumerTag a client-generated consumer tag to establish context* @param noLocal true if the server should not deliver to this consumer* messages published on this channel's connection* @param exclusive true if this is an exclusive consumer* @param callback an interface to the consumer object* @param arguments a set of arguments for the consume* @return the consumerTag associated with the new consumer* @throws java.io.IOException if an error is encountered* @see com.rabbitmq.client.AMQP.Basic.Consume* @see com.rabbitmq.client.AMQP.Basic.ConsumeOk*/ public String basicConsume(String queue, boolean autoAck, String consumerTag,boolean noLocal, boolean exclusive, Map<String, Object> arguments,final Consumer callback)throws IOException {BlockingRpcContinuation<String> k = new BlockingRpcContinuation<String>() {public String transformReply(AMQCommand replyCommand) {String actualConsumerTag = ((Basic.ConsumeOk) replyCommand.getMethod()).getConsumerTag();_consumers.put(actualConsumerTag, callback);dispatcher.handleConsumeOk(callback, actualConsumerTag);return actualConsumerTag;}};rpc(new Basic.Consume.Builder().queue(queue).consumerTag(consumerTag).noLocal(noLocal).noAck(autoAck).exclusive(exclusive).arguments(arguments).build(),k);try {return k.getReply();} catch(ShutdownSignalException ex) {throw wrap(ex);} }這個方法最精簡的只要兩個參數,即String queue和Consumer callback:public String basicConsume(String queue, Consumer callback)。
方法主要是發送Basic.Consume幀,然后等待Basic.ConsumeOk幀。待收到broker端的Basic.ConsumeOk幀之后,觸發BlockingRpcContinuation中的transformReply()方法。有關BlockingRpcContinuation在[五]RabbitMQ-客戶端源碼之AMQChannel中有陳述。transformReply()方法先是提取consumerTag,這個consumerTag是在channel.basicConsume()方法中設置的,是其中的一個參數,如果設置了此參數,那么consumerTag就是這個參數的值;如果沒有設置這個consumerTag,Broker會返回一個consumerTag,類似:amq.ctag-Mg0eSv2GgfG6UzfncD8E9g。然后作為key和Consumer這個回調函數一起放置到_consumer這個回調函數中以備后面檢索調用。這個consumerTag還作為transformReply()方法的返回值,存入到BlockingRpcContinuation對象中,既而在basicConsume這個方法最后調用k.getReply()方法是獲取其值,也就是說basicConsume方法的返回值就是consumerTag。
當發送Basic.Consume幀之后,由broker返回的是Basic.ConsumeOk幀+Basic.Deliver幀,Basic.ConsumerOk幀由上面方法處理,Basic.Deliver幀由processAsync處理。
說到basicConsume方法,還有一個重要的就是設置Consumer這個回調函數。一般為了方便直接使用RabbitMQ客戶端自帶的QueueingConsumer來處理,當然也可以實現一個自定義的Consumer,當然了需要實現Consumer這個接口,可以參考QueueingConsumer的父類DefaultConsumer, 有關Consumer相關的更多細節,可以參考:[九]RabbitMQ-客戶端源碼之Consumer。
dispatcher.handleConsumeOk(callback, actualConsumerTag);這段代碼實際上就是:callback.handleConsumeOk(actualConsumerTag),這個還是調用到Consumer的方法處理。
Basic.Get
上面的Basic.Consume是基于push模式的,而Basic.Get是基于pull模式的。相關的代碼如下:
public GetResponse basicGet(String queue, boolean autoAck)throws IOException {AMQCommand replyCommand = exnWrappingRpc(new Basic.Get.Builder().queue(queue).noAck(autoAck).build());Method method = replyCommand.getMethod();if (method instanceof Basic.GetOk) {Basic.GetOk getOk = (Basic.GetOk)method;Envelope envelope = new Envelope(getOk.getDeliveryTag(),getOk.getRedelivered(),getOk.getExchange(),getOk.getRoutingKey());BasicProperties props = (BasicProperties)replyCommand.getContentHeader();byte[] body = replyCommand.getContentBody();int messageCount = getOk.getMessageCount();return new GetResponse(envelope, props, body, messageCount);} else if (method instanceof Basic.GetEmpty) {return null;} else {throw new UnexpectedMethodError(method);} }基本上就是客戶端發送Basic.Get至Broker,Broker返回Basic.GetOK并攜帶數據。注意方法最后返回GetResponse對象,這個對象就是包裝了一下數據。
事務
和事務有關的代碼:
/** Public API - {@inheritDoc} */ public Tx.SelectOk txSelect()throws IOException {return (Tx.SelectOk) exnWrappingRpc(new Tx.Select()).getMethod(); }/** Public API - {@inheritDoc} */ public Tx.CommitOk txCommit()throws IOException {return (Tx.CommitOk) exnWrappingRpc(new Tx.Commit()).getMethod(); }/** Public API - {@inheritDoc} */ public Tx.RollbackOk txRollback()throws IOException {return (Tx.RollbackOk) exnWrappingRpc(new Tx.Rollback()).getMethod(); }這里可以看到基本對于事務的處理是采用rpc的方法一對一的進行交互,有關RabbitMQ的事務機制可以參考:RabbitMQ之消息確認機制(事務+Confirm)。
其余
ChannelN還有:
- 關于Exchange,Queue的申明創建,刪除,綁定,解綁
- 關閉處理
- Basic.Return
- Basic.Flow
- Basic.Recover
- Basic.Cancel
- Basic.Ack/.Nack/.Reject
這些就不做詳細介紹了。有興趣的同學可以繼續翻閱源碼,這些都比較簡單。
附:本系列全集
歡迎跳轉到本文的原文鏈接:https://honeypps.com/mq/rabbitmq-client-source-code-of-channeln/
歡迎支持筆者新作:《深入理解Kafka:核心設計與實踐原理》和《RabbitMQ實戰指南》,同時歡迎關注筆者的微信公眾號:朱小廝的博客。
總結
以上是生活随笔為你收集整理的[八]RabbitMQ-客户端源码之ChannelN的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: [七]RabbitMQ-客户端源码之AM
- 下一篇: [九]RabbitMQ-客户端源码之Co