javascript
RabbitMQ,Springboot整合RabbitMQ实现 消息可靠性投递,Consumer ACK,TTL,死信队列,使用TTL+死信队列=延迟队列
搭建SpringBoot項(xiàng)目,用于演示
springboot版本
<!-- spring boot --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-dependencies</artifactId><version>2.3.2.RELEASE</version><type>pom</type><!-- import 導(dǎo)入父工程的配置--><scope>import</scope></dependency>消費(fèi)與提供方的pom.xml
<dependencies><!-- spring-boot-starter-web spring-boot-starter-actuator綁定在一塊 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-actuator</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!--2. rabbitmq--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId></dependency></dependencies>提供端的application.yml
# 配置RabbitMQ的基本信息 ip 端口 username password 虛擬機(jī).. spring:rabbitmq:host: 192.168.93.132 # ipport: 5672username: xiaofupassword: xiaofuvirtual-host: /springboot# 舊版本 開(kāi)啟 confirm 確認(rèn)模式 # publisher-confirms: true# 新版的開(kāi)啟 confirm 確認(rèn)模式publisher-confirm-type: correlated# 開(kāi)啟 return 退回模式publisher-returns: true消費(fèi)端的application.yml
spring:rabbitmq:host: 192.168.93.132 #主機(jī)ipport: 5672 #端口username: xiaofupassword: xiaofuvirtual-host: /springboot# 舊版本 開(kāi)啟 confirm 確認(rèn)模式# publisher-confirms: true# 新版的開(kāi)啟 confirm 確認(rèn)模式publisher-confirm-type: correlated# 開(kāi)啟 return 退回模式publisher-returns: truelistener:# RabbitMQ模式使用simple simple支持事務(wù)的simple:# Consumer ACK機(jī)制:設(shè)置為手動(dòng)簽收acknowledge-mode: manualprefetch: 1 # 限流,配置1 表示消費(fèi)端每次向MQ拉取最大一條消息# direct 是不支持事務(wù)的 # direct: # acknowledge-mode: manual # ACK機(jī)制:設(shè)置為手動(dòng)簽收 # retry: # enabled: true # 是否支持重試 # max-attempts: 3 # 重試機(jī)制,3次1.消息的可靠投遞
在使用 RabbitMQ 的時(shí)候,作為消息發(fā)送方希望杜絕任何消息丟失或者投遞失敗場(chǎng)景。RabbitMQ 為我們提
供了兩種方式用來(lái)控制消息的投遞可靠性模式。
? confirm 確認(rèn)模式
? return 退回模式
rabbitmq 整個(gè)消息投遞的路徑為:
producer—>rabbitmq broker—>exchange—>queue—>consumer
? 消息從 producer 到 exchange 則會(huì)返回一個(gè) confirmCallback 。
?== 消息從 exchange–>queue 投遞失敗則會(huì)返回一個(gè) returnCallback 。==
我們將利用這兩個(gè) callback 控制消息的可靠性投遞
? exchange要持久化
? queue要持久化
? message要持久化
1.1配置confirm 確認(rèn)模式與return 退回模式
# 舊版本 開(kāi)啟 confirm 確認(rèn)模式 # publisher-confirms: true# 新版的開(kāi)啟 confirm 確認(rèn)模式publisher-confirm-type: correlated# 開(kāi)啟 return 退回模式publisher-returns: true1.2創(chuàng)建用于測(cè)試消息的可靠投遞的交換機(jī)與隊(duì)列
package com.fs.rabbitmq.config;import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /*消息的可靠投遞創(chuàng)建測(cè)試交換機(jī)與隊(duì)列測(cè)試交換機(jī)可靠性記得在application.yml中配置#開(kāi)啟 confirm 確認(rèn)模式 設(shè)置為默認(rèn)的自動(dòng)確認(rèn)模式publisher-confirm-type: none*/ @Configuration public class RabbitMQConfigConfirmAndReturn {//創(chuàng)建交換機(jī)@Beanpublic Exchange exchangeConfirm(){//創(chuàng)建一個(gè)Direct:定向,把消息交給符合指定routing key 的隊(duì)列的交換機(jī)return ExchangeBuilder.directExchange("test_Exchange_Confirm").build();}//創(chuàng)建一個(gè)隊(duì)列@Beanpublic Queue queueConfirm(){//創(chuàng)建一個(gè)隊(duì)列而且是持久的return QueueBuilder.durable("test_Queue_Confirm").build();}// 隊(duì)列和交換機(jī)綁定關(guān)系 Binding/*1. 指定哪個(gè)隊(duì)列2. 指定哪個(gè)交換機(jī)3. routing key*/@Beanpublic Binding bindingConfirm(@Qualifier("exchangeConfirm") Exchange exchange,@Qualifier("queueConfirm") Queue queue){//把隊(duì)列綁定在交換機(jī)上指定routingKey沒(méi)有參數(shù)return BindingBuilder.bind(queue).to(exchange).with("testConfirm").noargs();}// return 退回模式//使用test_Exchange_Confirm這個(gè)交換機(jī)}1.3 ProducerTest測(cè)試類中編寫(xiě)測(cè)試confirm與return測(cè)試方法
1.3.1首先在測(cè)試類中注入RabbitTemplate
@SpringBootTest @RunWith(SpringRunner.class) public class ProducerTest {//1.注入RabbitTemplate@Autowiredprivate RabbitTemplate rabbitTemplate; }1.3.2測(cè)試方法
/*** 確認(rèn)模式: 該模式是來(lái)校驗(yàn)消息是否發(fā)送成功到交換機(jī)中* 步驟:* 1.確認(rèn)開(kāi)啟: publisher-confirm-type: none* 2.在rabbitTemplate定義一個(gè)confirmCallBack回調(diào)函數(shù)*/@Testpublic void testConfirm(){//使用rabbitTemplate的確認(rèn)回調(diào)方法rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {/*** @param correlationData 相關(guān)的配置信息* @param ack 代表了Exchange交換機(jī)是否收到了消息,true表示收到了消息,false表示交換機(jī)沒(méi)有收到消息* @param cause 失敗的原因*/@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {System.out.println("發(fā)送消息后,回調(diào)方法執(zhí)行了~~~");if (ack){System.out.println("發(fā)送消息成功,啟動(dòng)成功方案:"+cause);}else {System.out.println("發(fā)送消息失敗,啟動(dòng)失敗方案:"+cause);}}});//發(fā)送消息,假設(shè)寫(xiě)錯(cuò)交換機(jī)的名稱,肯定會(huì)發(fā)送到Exchange失敗,就會(huì)執(zhí)行我們的confirmCallBack回調(diào)方法rabbitTemplate.convertAndSend("test_Exchange_Confirm","testConfirm","測(cè)試Confirm確認(rèn)模式~~~");}/*** 回退模式: 該模式是用來(lái)校驗(yàn)該消息是否從Exchange交換機(jī)成功路由到了queue隊(duì)列中* 當(dāng)Exchange路由到queue失敗后,就會(huì)執(zhí)行這個(gè)ReturnCallBack方法** 步驟:* 1.開(kāi)啟回退模式: publisher-returns: true* 2.設(shè)置ReturnCallBack* 3,設(shè)置Exchange處理的消息的模式* 1.如果消息沒(méi)有路由到Queue中,則丟棄消息(默認(rèn))* 2.如果消息沒(méi)有路由到Queue中,返回給消息到發(fā)送方的ReturnCallBack方法*/@Testpublic void testReturn() {//設(shè)置ReturnCallbackrabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {/**** @param message 消息對(duì)象* @param replyCode 錯(cuò)誤碼* @param replyText 錯(cuò)誤信息* @param exchange 交換機(jī)* @param routingKey 路由鍵*/@Overridepublic void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {//當(dāng)我們的消息發(fā)送從Exchange交換機(jī)發(fā)送到Queue錯(cuò)誤后就會(huì)執(zhí)行這個(gè)回調(diào)方法System.out.println("ReturnCallBack 執(zhí)行了~~~");System.out.println(message);System.out.println(replyCode);System.out.println(replyText);System.out.println(exchange);System.out.println(routingKey);}});//發(fā)送消息,/*測(cè)試1:使用正確的Exchange與routingKey執(zhí)行成功,不會(huì)執(zhí)行我們的ReturnCallBack回退方法測(cè)試2:使用正確的Exchange與錯(cuò)誤的不存在的routingKey,就會(huì)執(zhí)行我們的ReturnCallBack回退方法*/rabbitTemplate.convertAndSend("test_Exchange_Confirm", "testConfirm111", "testConfirm~~~發(fā)送消息,測(cè)試回退模式");}1.3.3 測(cè)試消息可靠投遞
confirm測(cè)試
return測(cè)試
2.Consumer Ack
ack指Acknowledge,確認(rèn)。 表示消費(fèi)端收到消息后的確認(rèn)方式。
有三種確認(rèn)方式:
? 自動(dòng)確認(rèn):acknowledge=“none”
? 手動(dòng)確認(rèn):acknowledge=“manual”
? 根據(jù)異常情況確認(rèn):acknowledge=“auto”,(這種方式使用麻煩,不作講解)
其中自動(dòng)確認(rèn)是指,當(dāng)消息一旦被Consumer接收到,則自動(dòng)確認(rèn)收到,并將相應(yīng) message 從 RabbitMQ 的消息緩存中移除。但是在實(shí)際業(yè)務(wù)處理中,很可能消息接收到,業(yè)務(wù)處理出現(xiàn)異常,那么該消息就會(huì)丟失。如果設(shè)置了手動(dòng)確認(rèn)方式,則需要在業(yè)務(wù)處理成功后,調(diào)用channel.basicAck(),手動(dòng)簽收,如果出現(xiàn)異常,則調(diào)用channel.basicNack()方法,讓其自動(dòng)重新發(fā)送消息。
2.1 消費(fèi)端application.yml中配置
需要開(kāi)啟手動(dòng)簽收消息
listener:# RabbitMQ模式使用simple simple支持事務(wù)的simple:# Consumer ACK機(jī)制:設(shè)置為手動(dòng)簽收acknowledge-mode: manualprefetch: 1 # 限流,配置1 表示消費(fèi)端每次向MQ拉取最大一條消息2.2 在消費(fèi)端創(chuàng)建監(jiān)聽(tīng)類
在方法上使用下面的注解,監(jiān)聽(tīng)的隊(duì)列
@RabbitListener(queues = “隊(duì)列名稱”)
下面的代碼監(jiān)聽(tīng)的是我們上面測(cè)試confirm的隊(duì)列
package com.fs.Queuelistener;import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;import java.io.IOException;/*** Consumer ACK機(jī)制:默認(rèn)自動(dòng)簽收* 1. 設(shè)置手動(dòng)簽收。acknowledge="manual"* 2. 讓監(jiān)聽(tīng)器類實(shí)現(xiàn)ChannelAwareMessageListener接口* 3. 如果消息成功處理,則調(diào)用channel的 basicAck()簽收* 4. 如果消息處理失敗,則調(diào)用channel的basicNack()拒絕簽收,broker重新發(fā)送給consumer*/ @Component public class AckListener {@RabbitListener(queues = "test_Queue_Confirm")public void testAck(Message message, Channel channel) throws IOException {//得到消息的唯一deliveryTaglong deliveryTag = message.getMessageProperties().getDeliveryTag();//模擬接收到消息消費(fèi)的邏輯try{//接收到消息進(jìn)行消費(fèi)System.out.println(new String(message.getBody()));System.out.println("消息到了ACK機(jī)制中~~~");//模擬執(zhí)行邏輯錯(cuò)誤 // int i = 1/0;//手動(dòng)簽收消息/*deliveryTag:表示收到的消息的參數(shù)標(biāo)簽(消息的唯一id)第二個(gè)參數(shù):是否簽收多條消息(批量簽收消息)*/channel.basicAck(deliveryTag,true);}catch (Exception e){//當(dāng)我們上面的邏輯出現(xiàn)錯(cuò)誤,就不會(huì)簽收消息,我們?cè)赾atch中就執(zhí)行拒絕簽收System.out.println("消費(fèi)邏輯出現(xiàn)異常~~~消息被Ack機(jī)制重回隊(duì)列");//拒絕簽收/*第三個(gè)參數(shù):requeue:重回隊(duì)列。如果設(shè)置為true,則消息重新回到queue的尾部,broker會(huì)重新發(fā)送該消息給消費(fèi)端,false為丟棄改消息,若設(shè)置了死信隊(duì)列,就會(huì)交給死信隊(duì)列*/channel.basicNack(deliveryTag,true,false);}}}2.3 測(cè)試ACK
啟動(dòng)主啟動(dòng):ConsumerSpringbootApplication
在提供方發(fā)送消息
在消費(fèi)方查看消息被消費(fèi)
3.TTL 全稱 Time To Live(存活時(shí)間/過(guò)期時(shí)間)
? TTL 全稱 Time To Live(存活時(shí)間/過(guò)期時(shí)間)。
? 當(dāng)消息到達(dá)存活時(shí)間后,還沒(méi)有被消費(fèi),會(huì)被自動(dòng)清除。
? RabbitMQ可以對(duì)消息設(shè)置過(guò)期時(shí)間,也可以對(duì)整個(gè)隊(duì)列(Queue)設(shè)置過(guò)期時(shí)間。
? 設(shè)置隊(duì)列過(guò)期時(shí)間使用參數(shù):x-message-ttl,單位:ms(毫秒),會(huì)對(duì)整個(gè)隊(duì)列消息統(tǒng)一過(guò)期。
? 設(shè)置消息過(guò)期時(shí)間使用參數(shù):expiration。單位:ms(毫秒),當(dāng)該消息在隊(duì)列頭部時(shí)(消費(fèi)時(shí)),會(huì)單獨(dú)判斷這一消息是否過(guò)期。
? 如果兩者都進(jìn)行了設(shè)置,以時(shí)間短的為準(zhǔn)。
3.1 在提供方編寫(xiě)TTL的交換機(jī)與隊(duì)列的創(chuàng)建代碼
package com.fs.rabbitmq.config;import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /* 測(cè)試RabbitMQ的TTL*/ @Configuration public class RabbitMQConfigTTL {//創(chuàng)建交換機(jī)@Beanpublic Exchange exchangeTtl(){//創(chuàng)建一個(gè)Topic:通配符,把消息交給符合routing pattern(路由模式) 的隊(duì)列 的交換機(jī)return ExchangeBuilder.topicExchange("test_Exchange_TTL").build();}//創(chuàng)建隊(duì)列@Beanpublic Queue queueTtl(){//創(chuàng)建一個(gè)隊(duì)列,設(shè)置消息過(guò)期時(shí)間為10秒return QueueBuilder.durable("test_Queue_TTL").withArgument("x-message-ttl",10000).build();}//綁定交換機(jī)與隊(duì)列@Beanpublic Binding bindingTtl(@Qualifier("exchangeTtl") Exchange exchange, @Qualifier("queueTtl") Queue queue){//將隊(duì)列綁定在topic通配符交換機(jī)上設(shè)置路由規(guī)則routingKey,沒(méi)有參數(shù)return BindingBuilder.bind(queue).to(exchange).with("ttl.#").noargs();} }3.2 提供方測(cè)試代碼編寫(xiě)
/*** TTL:過(guò)期時(shí)間* 1. 隊(duì)列統(tǒng)一過(guò)期** 2. 消息單獨(dú)過(guò)期* 結(jié)果:* 如果設(shè)置了消息的過(guò)期時(shí)間,也設(shè)置了隊(duì)列的過(guò)期時(shí)間,它以時(shí)間短的為準(zhǔn)。* 隊(duì)列過(guò)期后,會(huì)將隊(duì)列所有消息全部移除。* 消息過(guò)期后,只有消息在隊(duì)列頂端,才會(huì)判斷其是否過(guò)期(移除掉)*/@Testpublic void testTTL(){//* 1. 隊(duì)列統(tǒng)一過(guò)期//發(fā)送10條消息,不去消費(fèi),查看web控制臺(tái)10秒后這10條消息是否會(huì)被丟棄 // for (int i = 0; i < 10; i++) { // //調(diào)用方法 // rabbitTemplate.convertAndSend("test_Exchange_TTL","ttl.hehe.xf","測(cè)試TTL超時(shí)時(shí)間隊(duì)列消息發(fā)送~~~"+i); // }//* 2. 消息單獨(dú)過(guò)期// 消息后處理對(duì)象,設(shè)置一些消息的參數(shù)信息,發(fā)送消息的時(shí)候傳遞該參數(shù),那么這些消息就會(huì)具有該參數(shù)//該對(duì)象是一個(gè)接口,使用匿名類部?jī)?nèi)來(lái)創(chuàng)建實(shí)現(xiàn)類MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {//設(shè)置發(fā)送消息的參數(shù)@Overridepublic Message postProcessMessage(Message message) throws AmqpException {message.getMessageProperties().setExpiration("5000");//設(shè)置消息過(guò)期時(shí)間為5秒return message;}};//再次發(fā)生一條消息,使用我們?cè)O(shè)置好的消息參數(shù)對(duì)消息進(jìn)行封裝//發(fā)送成功后,去看我們的隊(duì)列中的這條消息是否是5秒過(guò)期,因?yàn)槲覀冞@個(gè)消息是在隊(duì)列的頂端,等待被消費(fèi),而且過(guò)期時(shí)間短于隊(duì)列統(tǒng)一時(shí)間,所以優(yōu)先我們這單條消息過(guò)期時(shí)間 // rabbitTemplate.convertAndSend("test_Exchange_TTL","ttl.hehe.fs","我被使用了消息參數(shù),5秒后過(guò)期~~~",messagePostProcessor);//我們?cè)俅螛O端的測(cè)試,讓我們這條消息不在隊(duì)列的頂端//這條i==5的消息設(shè)置5秒過(guò)期,但是他在隊(duì)列的中間,5秒后已經(jīng)過(guò)期,但是不會(huì)被隊(duì)列移除掉,當(dāng)隊(duì)列統(tǒng)一的過(guò)期時(shí)間到了,就會(huì)隨著統(tǒng)一被隊(duì)列丟棄 或者交給死信交換機(jī)//因?yàn)殛?duì)列只會(huì)移除隊(duì)列頂端的過(guò)期消息,例如當(dāng)有消費(fèi)者來(lái)消費(fèi)這10條消息后,但是i=5這條消息//已經(jīng)過(guò)期,當(dāng)消費(fèi)到這條消息時(shí),它就在隊(duì)列的頂端,就會(huì)判斷該消息是否過(guò)期,//若過(guò)期,者就會(huì)移除,或者交給 死信交換機(jī)//不會(huì)發(fā)送給消費(fèi)者消費(fèi)的for (int i = 0; i < 10; i++) {if (i == 5) {//消息單獨(dú)過(guò)期rabbitTemplate.convertAndSend("test_Exchange_TTL", "ttl.hehe.xf", "我被使用了消息參數(shù),5秒后過(guò)期~~~而且在隊(duì)列的中間,我會(huì)不會(huì)5秒后過(guò)期呢?", messagePostProcessor);} else {//不過(guò)期的消息rabbitTemplate.convertAndSend("test_Exchange_TTL", "ttl.hehe.xf", "我發(fā)送了消息....");}}}3.3 測(cè)試發(fā)送,查詢queue隊(duì)列中的消息存活時(shí)間
3.3.1 測(cè)試 隊(duì)列統(tǒng)一過(guò)期
將1. 隊(duì)列統(tǒng)一過(guò)期這段代碼注釋放開(kāi),把其余代碼注釋,然后點(diǎn)擊運(yùn)行
3.3.2 測(cè)試 消息單獨(dú)過(guò)期
使用這個(gè)類MessagePostProcessor來(lái)封裝我們發(fā)生消息的屬性參數(shù)
3.3.3 測(cè)試 ,讓我們這條消息不在隊(duì)列的頂端
4.死信隊(duì)列 DLX 。Dead Letter Exchange(死信交換機(jī))
死信隊(duì)列,英文縮寫(xiě):DLX 。Dead Letter Exchange(死信交換機(jī)),當(dāng)消息成為Dead message后,可以
被重新發(fā)送到另一個(gè)交換機(jī),這個(gè)交換機(jī)就是DLX。
消息成為死信的三種情況:
隊(duì)列綁定死信交換機(jī):
給隊(duì)列設(shè)置參數(shù): x-dead-letter-exchange 和 x-dead-letter-routing-key
4.1 創(chuàng)建用于測(cè)試死信隊(duì)列的交換機(jī)與隊(duì)列
死信隊(duì)列:
1. 聲明正常的隊(duì)列(test_queue_dlx)和正常交換機(jī)(test_exchange_dlx)
2. 聲明死信隊(duì)列(queue_dlx)和死信交換機(jī)(exchange_dlx)
3. 正常隊(duì)列綁定死信交換機(jī)
設(shè)置兩個(gè)參數(shù):
* x-dead-letter-exchange:死信交換機(jī)名稱
* x-dead-letter-routing-key:發(fā)送給死信交換機(jī)的routingkey
4.2 編寫(xiě)測(cè)試類發(fā)送測(cè)試死信消息
* 發(fā)送測(cè)試死信消息:* 1. 過(guò)期時(shí)間* 2. 長(zhǎng)度限制* 3. 消息拒收 /*** 發(fā)送測(cè)試死信消息:* 1. 過(guò)期時(shí)間* 2. 長(zhǎng)度限制* 3. 消息拒收*/@Testpublic void testDlx() throws InterruptedException {//測(cè)試過(guò)期時(shí)間,死信消息,首先發(fā)送給了正常的交換機(jī),交換機(jī)路由到正常的隊(duì)列,然后該隊(duì)列的消息由于設(shè)置了10秒過(guò)期,10秒內(nèi)沒(méi)有被消費(fèi)// 過(guò)期后就交給死信交換機(jī),然后由死信交換機(jī)路由到死信隊(duì)列,然后被消費(fèi)掉 // rabbitTemplate.convertAndSend("exchange_Normal_DLX","test.dlx.xf","我發(fā)送了一條10秒后就過(guò)期的消息~~~");//測(cè)試隊(duì)列長(zhǎng)度,當(dāng)一次性發(fā)送超過(guò)隊(duì)列長(zhǎng)度的消息,隊(duì)列就會(huì)將多余的消息交給死信交換機(jī)//由于我們創(chuàng)建隊(duì)列的時(shí)候,改隊(duì)列的長(zhǎng)度為10,那么就有10 條消息被第一時(shí)間交給死信交換機(jī),然后在等10秒,10秒后隊(duì)列中的10條消息沒(méi)有被消費(fèi),也會(huì)交給死信交換機(jī)//由執(zhí)行控制臺(tái)結(jié)果得知,隊(duì)列是先進(jìn)先出的原則先進(jìn)的0-9會(huì)被后進(jìn)的10-19擠出來(lái),所以0-9先變成死信消息,而10-19是10秒過(guò)期后未被消費(fèi)成的死信消息 // for (int i = 0; i < 20; i++) { // Thread.sleep(10); // rabbitTemplate.convertAndSend("exchange_Normal_DLX","test.dlx.xf","我發(fā)送了多條10秒后就過(guò)期的消息~~~"+i); // }//測(cè)試消費(fèi)端拒收消息,拒收的消息也不返回發(fā)送的隊(duì)列,就會(huì)變成死信消息,就交給死信交換機(jī)處理rabbitTemplate.convertAndSend("exchange_Normal_DLX","test.dlx.xf","我發(fā)送了消費(fèi)端出錯(cuò)不消費(fèi)的消息~~~");}4.3 編寫(xiě)消費(fèi)端的監(jiān)聽(tīng)隊(duì)列類 DlxListener 與 TestDlxListener
DlxListener
package com.fs.Queuelistener;import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;import java.io.IOException;/* 監(jiān)聽(tīng)死信隊(duì)列中的消息*/ @Component public class DlxListener {//監(jiān)聽(tīng)死信隊(duì)列@RabbitListener(queues = "queue_dlx")public void testDlx(Message message, Channel channel) throws IOException {//得到消息唯一標(biāo)識(shí)long deliveryTag = message.getMessageProperties().getDeliveryTag();try {//消費(fèi)死信隊(duì)列中的消息System.out.println(new String(message.getBody()));//手動(dòng)關(guān)閉channel.basicAck(deliveryTag,true);}catch (Exception e){//上面代碼邏輯出現(xiàn)錯(cuò)誤e.printStackTrace();//拒絕接收,從新發(fā)送channel.basicNack(deliveryTag,true,true);}} }TestDlxListener
package com.fs.Queuelistener;import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;import java.io.IOException;/*監(jiān)聽(tīng)正常隊(duì)列的消息,然后異常拒收,也不返回給發(fā)送隊(duì)列,使消息成為死信消息,交給死信交換機(jī)*/ @Component public class TestDlxListener {@RabbitListener(queues = "queue_Normal_DLX")public void testDlxListener(Message message, Channel channel) throws IOException {//得到笑嘻嘻唯一標(biāo)識(shí)long deliveryTag = message.getMessageProperties().getDeliveryTag();try{//消費(fèi)消息//模擬消費(fèi)出錯(cuò)int i = 1/0;//手動(dòng)提交channel.basicAck(deliveryTag,true);}catch (Exception e){System.out.println("消息消費(fèi)出現(xiàn)異常,拒絕簽收");//拒絕接收消息//出錯(cuò)后將消息丟棄,不返回給發(fā)送隊(duì)列 拒絕簽收,不重回隊(duì)列 requeue=falsechannel.basicNack(deliveryTag,true,false);}} }4.4 測(cè)試
啟動(dòng)消費(fèi)端的主啟動(dòng)
4.4.1 測(cè)試 過(guò)期時(shí)間 消息過(guò)期后交給死信交換機(jī)被消費(fèi)掉
先將TestDlxListener類中的@Component注釋掉 ,將注釋掉的測(cè)試代碼打開(kāi),后面的代碼注釋掉,在run
4.4.2 測(cè)試隊(duì)列 長(zhǎng)度限制
將測(cè)試長(zhǎng)度限制的代碼放開(kāi),其余代碼注釋,點(diǎn)擊run
4.4.3 消息拒收
模擬業(yè)務(wù)錯(cuò)誤,啟動(dòng)消費(fèi)端主啟動(dòng)
4.5 死信隊(duì)列小結(jié)
消息成為死信的三種情況:
3. 隊(duì)列消息長(zhǎng)度到達(dá)限制;
4. 消費(fèi)者拒接消費(fèi)消息,并且不重回隊(duì)列;
5. 原隊(duì)列存在消息過(guò)期設(shè)置,消息到達(dá)超時(shí)時(shí)間未被消費(fèi);
5 延遲隊(duì)列
延遲隊(duì)列,即消息進(jìn)入隊(duì)列后不會(huì)立即被消費(fèi),只有到達(dá)指定時(shí)間后,才會(huì)被消費(fèi)。
很可惜,在RabbitMQ中并未提供延遲隊(duì)列功能。
但是可以使用:TTL+死信隊(duì)列 組合實(shí)現(xiàn)延遲隊(duì)列的效果。
5.1 提供方創(chuàng)建用于測(cè)試延遲隊(duì)列的交換機(jī)與隊(duì)列
package com.fs.rabbitmq.config;import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;/* RabbitMQ是沒(méi)有實(shí)現(xiàn)延遲隊(duì)列的延遲隊(duì)列的正常queue是沒(méi)有消費(fèi)者的,否則生產(chǎn)的消息會(huì)被立馬消費(fèi)掉,就不會(huì)交給死信交換機(jī),達(dá)不到延遲隊(duì)列效果但是我們可以通過(guò)使用TTL 加上(DLX)死信隊(duì)列組合實(shí)現(xiàn)延遲隊(duì)列的效果延遲隊(duì)列:1. 定義正常交換機(jī)(order_exchange)和隊(duì)列(order_queue)2. 定義死信交換機(jī)(order_exchange_dlx)和隊(duì)列(order_queue_dlx)3. 綁定,設(shè)置正常隊(duì)列過(guò)期時(shí)間為30分鐘*/ @Configuration public class RabbitMQDelayQueueConfig {//定義死信交換機(jī)(order_exchange_dlx)和隊(duì)列(order_queue_dlx)@Beanpublic Exchange orderExchangeDlx(){return ExchangeBuilder.topicExchange("order_exchange_dlx").build();}//定義死信隊(duì)列@Beanpublic Queue orderQueueDlx(){return QueueBuilder.durable("order_queue_dlx").build();}//將死信交換機(jī)與死信隊(duì)列相互綁定@Beanpublic Binding orderBindingDlx(@Qualifier("orderExchangeDlx") Exchange exchange,@Qualifier("orderQueueDlx") Queue queue){return BindingBuilder.bind(queue).to(exchange).with("dlx.order.#").noargs();}//定義正常交換機(jī)(order_exchange)和隊(duì)列(order_queue)@Beanpublic Exchange orderExchange(){return ExchangeBuilder.topicExchange("order_exchange").build();}//定義隊(duì)列@Beanpublic Queue orderQueue(){return QueueBuilder.durable("order_queue").withArgument("x-dead-letter-exchange","order_exchange_dlx")//綁定死信交換機(jī).withArgument("x-dead-letter-routing-key","dlx.order.xf")//綁定routingKey value路由規(guī)則dlx.order.#.withArgument("x-message-ttl",10000)//給這個(gè)隊(duì)列添加過(guò)期時(shí)間 測(cè)試就使用10秒過(guò)期時(shí)間.build();}//將正常交換機(jī)與隊(duì)列相互綁定@Beanpublic Binding orderBinding(@Qualifier("orderExchange") Exchange exchange,@Qualifier("orderQueue") Queue queue){return BindingBuilder.bind(queue).to(exchange).with("order.#").noargs();} }5.2 消費(fèi)端編寫(xiě)監(jiān)聽(tīng)測(cè)試延遲隊(duì)列的隊(duì)列
package com.fs.Queuelistener;import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener; import org.springframework.stereotype.Component;/* 測(cè)試 延遲隊(duì)列效果實(shí)現(xiàn) 消費(fèi)死信隊(duì)列中的消息*/ @Component public class OrderListener implements ChannelAwareMessageListener {/*監(jiān)聽(tīng)死信隊(duì)列的消息,并消費(fèi)*/@RabbitListener(queues = "order_queue_dlx")public void onMessage(Message message, Channel channel) throws Exception {long deliveryTag = message.getMessageProperties().getDeliveryTag();try {//1.接收轉(zhuǎn)換消息System.out.println(new String(message.getBody()));//2. 模擬處理業(yè)務(wù)邏輯System.out.println("處理業(yè)務(wù)邏輯...");System.out.println("根據(jù)訂單id查詢其狀態(tài)...");System.out.println("判斷狀態(tài)是否為支付成功");System.out.println("未支付,取消訂單,回滾庫(kù)存....");//3. 手動(dòng)簽收channel.basicAck(deliveryTag,true);} catch (Exception e) {//e.printStackTrace();System.out.println("出現(xiàn)異常,拒絕接受");//4.拒絕簽收,不重回隊(duì)列 requeue=falsechannel.basicNack(deliveryTag,true,false);}} }5.3 編寫(xiě)測(cè)試代碼,發(fā)送消息
/* 發(fā)送消息到隊(duì)列中,消息10秒到期,然后消費(fèi)端監(jiān)聽(tīng)死信隊(duì)列,并消費(fèi)*/@Testpublic void testDelay() throws InterruptedException {//1.發(fā)送訂單消息。 將來(lái)是在訂單系統(tǒng)中,下單成功后,發(fā)送消息rabbitTemplate.convertAndSend("order_exchange", "order.msg", "訂單信息:id=1,time=2019年8月17日16:41:47");//2.打印倒計(jì)時(shí)10秒,模擬消息等待10秒后消息過(guò)期后,在消費(fèi)端消費(fèi)死信for (int i = 10; i > 0 ; i--) {System.out.println(i+"...");Thread.sleep(1000);}} }5.4 測(cè)試
啟動(dòng)消費(fèi)端主啟動(dòng)
run測(cè)試方法
6 日志與監(jiān)控
RabbitMQ默認(rèn)日志存放路徑: /var/log/rabbitmq/rabbit@xxx.log
日志包含了RabbitMQ的版本號(hào)、Erlang的版本號(hào)、RabbitMQ服務(wù)節(jié)點(diǎn)名稱、cookie的hash值、
RabbitMQ配置文件地址、內(nèi)存限制、磁盤(pán)限制、默認(rèn)賬戶guest的創(chuàng)建以及權(quán)限配置等等。
6.1 命令
7 消息追蹤
在使用任何消息中間件的過(guò)程中,難免會(huì)出現(xiàn)某條消息異常丟失的情況。對(duì)于RabbitMQ而言,可能
是因?yàn)樯a(chǎn)者或消費(fèi)者與RabbitMQ斷開(kāi)了連接,而它們與RabbitMQ又采用了不同的確認(rèn)機(jī)制;也
有可能是因?yàn)榻粨Q器與隊(duì)列之間不同的轉(zhuǎn)發(fā)策略;甚至是交換器并沒(méi)有與任何隊(duì)列進(jìn)行綁定,生產(chǎn)者
又不感知或者沒(méi)有采取相應(yīng)的措施;另外RabbitMQ本身的集群策略也可能導(dǎo)致消息的丟失。這個(gè)時(shí)
候就需要有一個(gè)較好的機(jī)制跟蹤記錄消息的投遞過(guò)程,以此協(xié)助開(kāi)發(fā)和運(yùn)維人員進(jìn)行問(wèn)題的定位。
在RabbitMQ中可以使用Firehose和rabbitmq_tracing插件功能來(lái)實(shí)現(xiàn)消息追蹤。
7.1 消息追蹤-Firehose
firehose的機(jī)制是將生產(chǎn)者投遞給rabbitmq的消息,rabbitmq投遞給消費(fèi)者的消息按照指定的格式
發(fā)送到默認(rèn)的exchange上。這個(gè)默認(rèn)的exchange的名稱為amq.rabbitmq.trace,它是一個(gè)topic類
型的exchange。發(fā)送到這個(gè)exchange上的消息的routing key為 publish.exchangename 和
deliver.queuename。其中exchangename和queuename為實(shí)際exchange和queue的名稱,分別
對(duì)應(yīng)生產(chǎn)者投遞到exchange的消息,和消費(fèi)者從queue上獲取的消息。
注意:打開(kāi) trace 會(huì)影響消息寫(xiě)入功能,適當(dāng)打開(kāi)后請(qǐng)關(guān)閉。
rabbitmqctl trace_on:開(kāi)啟Firehose命令
rabbitmqctl trace_off:關(guān)閉Firehose命令
7.2 消息追蹤-rabbitmq_tracing
rabbitmq_tracing和Firehose在實(shí)現(xiàn)上如出一轍,只不過(guò)rabbitmq_tracing的方式比Firehose多了一
層GUI的包裝,更容易使用和管理。
啟用插件:rabbitmq-plugins enable rabbitmq_tracing
8 消息可靠性保障
100%確保消息發(fā)送成功
8.1 消息可靠性保障–消息補(bǔ)償
8.2 消息冪等性保障–樂(lè)觀鎖機(jī)制
冪等性指一次和多次請(qǐng)求某一個(gè)資源,對(duì)于資源本身應(yīng)該具有同樣的結(jié)果。也就是說(shuō),其任
意多次執(zhí)行對(duì)資源本身所產(chǎn)生的影響均與一次執(zhí)行的影響相同。
在MQ中指,消費(fèi)多條相同的消息,得到與消費(fèi)該消息一次相同的結(jié)果。
總結(jié)
以上是生活随笔為你收集整理的RabbitMQ,Springboot整合RabbitMQ实现 消息可靠性投递,Consumer ACK,TTL,死信队列,使用TTL+死信队列=延迟队列的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: RabbitMQ,RabbitMQ 的工
- 下一篇: ElasticSearch,docker