[九]RabbitMQ-客户端源码之Consumer
歡迎支持筆者新作:《深入理解Kafka:核心設計與實踐原理》和《RabbitMQ實戰指南》,同時歡迎關注筆者的微信公眾號:朱小廝的博客。
歡迎跳轉到本文的原文鏈接:https://honeypps.com/mq/rabbitmq-client-source-code-of-consumer/
在[八]RabbitMQ-客戶端源碼之ChannelN中講述basicConsume的方法時設計到Consumer這個回調函數,Consumer其實是一個接口,真正實現它的是QueueingConsumer和DefaultConsumer,且DefaultConsumer是QueueingConsumer的父類,里面都是空方法。在用戶使用時可以簡單的采用QueueingConsumer或者采用DefaultConsumer來重寫某些方法。
這里先來看下消費者客戶端的關鍵代碼:
QueueingConsumer consumer = new QueueingConsumer(channel);channel.basicQos(32);channel.basicConsume(QUEUE_NAME, false, "consumer_zzh",consumer)while (true) {QueueingConsumer.Delivery delivery = consumer.nextDelivery();String message = new String(delivery.getBody());System.out.println(" [X] Received '" + message + "'");channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);}可以看到QueueingConsumer作為channel.basicConsume的回調函數,之后再進行處理。
在AMQConnection中有關MainLoop的主線程,專門用來"第一線"的處理Broker發送回客戶端從幀。當Basic.Consume/.ConsumeOk開啟消費模式之后,Broker主動的向客戶端發送Basic.Delivery幀,MainLoop線程一步步的調用,最后到ChannelN的processAsync()方法中有:
if (method instanceof Basic.Deliver) {processDelivery(command, (Basic.Deliver) method);return true; }之后調用processDelivery方法:
protected void processDelivery(Command command, Basic.Deliver method) {Basic.Deliver m = method;Consumer callback = _consumers.get(m.getConsumerTag());if (callback == null) {if (defaultConsumer == null) {throw new IllegalStateException("Unsolicited delivery -" + " see Channel.setDefaultConsumer to handle this" + " case.");}else {callback = defaultConsumer;}}Envelope envelope = new Envelope(m.getDeliveryTag(), m.getRedelivered(),m.getExchange(),m.getRoutingKey());try {this.dispatcher.handleDelivery(callback, m.getConsumerTag(),envelope, (BasicProperties) command.getContentHeader(),command.getContentBody());} catch (Throwable ex) {getConnection().getExceptionHandler().handleConsumerException(this, ex,callback,m.getConsumerTag(), "handleDelivery");} }這個方法首先根據consumerTag從ChannelN中的_consumer這個HashMap中獲取相應的Consumer回調函數,然后調用這個回調函數的handleDeliver()方法進行處理,這里有些同學會有疑問,明明是調用ConsumerDispatcher dispatcher的handleDeliver()方法,其實這里只是包了一層皮,ConsumerDispatcher的handleDeliver()方法就是調用了Consumer的handleDeliver()方法。
我們接下去看看QueueingConsumer這個實現Consumer接口的類是怎么處理的:
@Override public void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throws IOException {checkShutdown();this._queue.add(new Delivery(envelope, properties, body)); }這里的queue就是一個LinkedBlockingQueue,客戶端程序通過調用nextDelivery()方法來獲取數據:
public Delivery nextDelivery()throws InterruptedException, ShutdownSignalException, ConsumerCancelledException {return handle(_queue.take()); }private Delivery handle(Delivery delivery) {if (delivery == POISON ||delivery == null && (_shutdown != null || _cancelled != null)) {if (delivery == POISON) {_queue.add(POISON);if (_shutdown == null && _cancelled == null) {throw new IllegalStateException("POISON in queue, but null _shutdown and null _cancelled. " +"This should never happen, please report as a BUG");}}if (null != _shutdown)throw Utility.fixStackTrace(_shutdown);if (null != _cancelled)throw Utility.fixStackTrace(_cancelled);}return delivery; }這個nextDelivery方法說白就是一個LinkedBlockingQueue的take()操作,也就是一個可能會阻塞等待的操作。
附:本系列全集
歡迎跳轉到本文的原文鏈接:https://honeypps.com/mq/rabbitmq-client-source-code-of-consumer/
歡迎支持筆者新作:《深入理解Kafka:核心設計與實踐原理》和《RabbitMQ實戰指南》,同時歡迎關注筆者的微信公眾號:朱小廝的博客。
總結
以上是生活随笔為你收集整理的[九]RabbitMQ-客户端源码之Consumer的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: [八]RabbitMQ-客户端源码之Ch
- 下一篇: [Conclusion]RabbitMQ