RabbitMQ之mandatory和immediate
歡迎支持筆者新作:《深入理解Kafka:核心設計與實踐原理》和《RabbitMQ實戰指南》,同時歡迎關注筆者的微信公眾號:朱小廝的博客。
歡迎跳轉到本文的原文鏈接:https://honeypps.com/mq/rabbitmq-analysis-of-mandatory-and-immediate/
1. 概述
mandatory和immediate是AMQP協議中basic.publish方法中的兩個標識位,它們都有當消息傳遞過程中不可達目的地時將消息返回給生產者的功能。對于剛開始接觸RabbitMQ的朋友特別容易被這兩個參數搞混,這里博主整理了寫資料,簡單講解下這兩個標識位。
mandatory
當mandatory標志位設置為true時,如果exchange根據自身類型和消息routeKey無法找到一個符合條件的queue,那么會調用basic.return方法將消息返回給生產者(Basic.Return + Content-Header + Content-Body);當mandatory設置為false時,出現上述情形broker會直接將消息扔掉。
immediate
當immediate標志位設置為true時,如果exchange在將消息路由到queue(s)時發現對于的queue上么有消費者,那么這條消息不會放入隊列中。當與消息routeKey關聯的所有queue(一個或者多個)都沒有消費者時,該消息會通過basic.return方法返還給生產者。
概括來說,mandatory標志告訴服務器至少將該消息route到一個隊列中,否則將消息返還給生產者;immediate標志告訴服務器如果該消息關聯的queue上有消費者,則馬上將消息投遞給它,如果所有queue都沒有消費者,直接把消息返還給生產者,不用將消息入隊列等待消費者了。
2. mandatory
在生產者通過channle的basicPublish方法發布消息時,通常有幾個參數需要設置,為此我們有必要了解清楚這些參數代表的具體含義及其作用,查看channel接口,會發現存在3個重載的basicPublish方法:
void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException; void basicPublish(String exchange, String routingKey, boolean mandatory, BasicProperties props, byte[] body) throws IOException; void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body) throws IOException;mandatory和immediate上面已經解釋過了,其余的參數分別是:
exchange:交換機名稱
routingkey:路由鍵
props:消息屬性字段,比如消息頭部信息等等
body:消息主體部分
本節主要講述mandatory, 下面我們寫一個demo,在RabbitMQ broker中有:
exchange : exchange.mandatory.test
queue: queue.mandatory.test
exchange路由到queue的routingkey是mandatory
這里先不講當前的exchange綁定到queue中,即:
詳細代碼如下:
package com.vms.test.zzh.rabbitmq.self;import com.rabbitmq.client.*;import java.io.IOException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException;/*** Created by hidden on 2017/2/7.*/ public class RBmandatoryTest {public static final String ip = "xx.xx.xx.73";public static final int port = 5672;public static final String username = "root";public static final String password = "root";public static final String queueName = "queue.mandatory.test";public static final String exchangeName = "exchange.mandatory.test";public static final String routingKey = "mandatory";public static final Boolean mandatory = true;public static final Boolean immediate = false;public static void main(String[] args) {try {ConnectionFactory factory = new ConnectionFactory();factory.setHost(ip);factory.setPort(port);factory.setUsername(username);factory.setPassword(password);Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.basicQos(1);channel.basicPublish(exchangeName, "", mandatory, immediate, MessageProperties.PERSISTENT_TEXT_PLAIN, "===mandatory===".getBytes()); // channel.close(); // connection.close();} catch (IOException e) {e.printStackTrace();} catch (TimeoutException e) {e.printStackTrace();}} }運行,之后通過wireshark抓包工具可以看到如下圖所示:
這里可以看到最后執行了basic.return方法,將發布者發出的消息返回給了發布者,查看協議的arguments參數部分可以看到:reply-text字段值為NO_ROUTE,表示消息并沒有路由到合適的隊列中;
那么我們該怎么獲取到沒有被正確路由到合適隊列的消息呢?這時候可以通過為channel信道設置ReturnListener監聽器來實現,具體代碼(main函數部分):
try {ConnectionFactory factory = new ConnectionFactory();factory.setHost(ip);factory.setPort(port);factory.setUsername(username);factory.setPassword(password);Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.basicQos(1);channel.basicPublish(exchangeName, "", mandatory, immediate, MessageProperties.PERSISTENT_TEXT_PLAIN, "===mandatory===".getBytes());channel.addReturnListener(new ReturnListener() {public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties basicProperties, byte[] body) throws IOException {String message = new String(body);System.out.println("Basic.return返回的結果是:"+message);}});// channel.close(); // connection.close();} catch (IOException e) {e.printStackTrace();} catch (TimeoutException e) {e.printStackTrace();}運行結果:
Basic.return返回的結果是:===mandatory===下面我們來看一下,設置mandatory標志且exchange路由到queue中,代碼部分只需要將:
channel.basicPublish(exchangeName, "", mandatory, immediate, MessageProperties.PERSISTENT_TEXT_PLAIN, "===mandatory===".getBytes());改為
channel.basicPublish(exchangeName, routingKey, mandatory, immediate, MessageProperties.PERSISTENT_TEXT_PLAIN, "===mandatory===".getBytes());即可。
通過wireshark抓包如下:
可以看到并不會有basic.return方法被調用。查看RabbitMQ管理界面發現消息已經到達了隊列。
3. immediate
在RabbitMQ3.0以后的版本里,去掉了immediate參數的支持,發送帶immediate標記的publish會返回如下錯誤:
“{amqp_error,not_implemented,“immediate=true”,‘basic.publish’}”
為什么移除immediate標記,參見如下版本變化描述:
Removal of “immediate” flag
What changed? We removed support for the rarely-used “immediate” flag on AMQP’s basic.publish.
Why on earth did you do that? Support for “immediate” made many parts of the codebase more complex, particularly around mirrored queues. It also stood in the way of our being able to deliver substantial performance improvements in mirrored queues.
What do I need to do? If you just want to be able to publish messages that will be dropped if they are not consumed immediately, you can publish to a queue with a TTL of 0.
If you also need your publisher to be able to determine that this has happened, you can also use the DLX feature to route such messages to another queue, from which the publisher can consume them.
這段解釋的大概意思是:immediate標記會影響鏡像隊列性能,增加代碼復雜性,并建議采用“TTL”和“DLX”等方式替代。
參考資料
歡迎跳轉到本文的原文鏈接:https://honeypps.com/mq/rabbitmq-analysis-of-mandatory-and-immediate/
歡迎支持筆者新作:《深入理解Kafka:核心設計與實踐原理》和《RabbitMQ實戰指南》,同時歡迎關注筆者的微信公眾號:朱小廝的博客。
總結
以上是生活随笔為你收集整理的RabbitMQ之mandatory和immediate的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: RocketMQ配置
- 下一篇: RabbitMQ之队列优先级