當前位置:
首頁 >
前端技术
> javascript
>内容正文
javascript
SpringBoot RabbitMQ 集成 七 延迟队列
生活随笔
收集整理的這篇文章主要介紹了
SpringBoot RabbitMQ 集成 七 延迟队列
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
為什么80%的碼農都做不了架構師?>>> ??
何為延遲隊列?
顧名思義,延遲隊列就是進入該隊列的消息會被延遲消費的隊列。而一般的隊列,消息一旦入隊了之后就會被消費者馬上消費。
延遲消費是延遲隊列最為常用的使用模式。如下圖所示,生產者產生的消息首先會進入緩沖隊列(圖中紅色隊列)。通過RabbitMQ提供的TTL擴展,這些消息會被設置過期時間,也就是延遲消費的時間。等消息過期之后,這些消息會通過配置好的DLX轉發到實際消費隊列(圖中藍色隊列),以此達到延遲消費的效果。
?
一、編寫代碼
1、編寫常量類RabbitDeadQueueConstant?
package com.lvgang.springbootrabbitmq.deadqueue;/*** @author lvgang*/ public class RabbitDeadQueueConstant {/*** 死信隊列*/public static final String DL_QUEUQ = "QUEUE_DL";/*** 轉發隊列*/public static final String REDIRECT_QUEUE = "QUEUE_REDIRECT";/*** 死信EXCHANGE*/public static final String DL_EXCHANGE = "EXCHANGE_DL";/*** 發送死信隊列KEY*/public static final String DL_QUEUQ_KEY ="KEY_DL";/*** 發送到轉發隊列KEY*/public static final String REDIRECT_QUEUE_KEY ="KEY_R";}2、編寫配置類RabbitDeadQueueConfig?
package com.lvgang.springbootrabbitmq.deadqueue;import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;import java.util.HashMap; import java.util.Map;/*** @author lvgang*/ @Configuration public class RabbitDeadQueueConfig {private static Logger logger = LoggerFactory.getLogger(RabbitDeadQueueConfig.class);/*** Queue 可以有4個參數* 1.隊列名* 2.durable 持久化消息隊列 ,rabbitmq重啟的時候不需要創建新的隊列 默認true* 3.auto-delete 表示消息隊列沒有在使用時將被自動刪除 默認是false* 4.exclusive 表示該消息隊列是否只在當前connection生效,默認是false*//*** 死信隊列跟交換機類型沒有關系 不一定為directExchange 不影響該類型交換機的特性.** @return the exchange*/@Beanpublic Exchange deadLetterExchange() {logger.info("創建deadLetterExchange成功");return ExchangeBuilder.directExchange(RabbitDeadQueueConstant.DL_EXCHANGE).durable(true).build();}/*** 聲明一個死信隊列.* x-dead-letter-exchange 對應 死信交換機* x-dead-letter-routing-key 對應 死信隊列** @return the queue*/@Beanpublic Queue deadLetterQueue() {logger.info("創建deadLetterQueue成功");Map<String, Object> args = new HashMap<>(2); // x-dead-letter-exchange 聲明 死信交換機args.put("x-dead-letter-exchange", RabbitDeadQueueConstant.DL_EXCHANGE); // x-dead-letter-routing-key 聲明 死信路由鍵args.put("x-dead-letter-routing-key", RabbitDeadQueueConstant.REDIRECT_QUEUE_KEY);return QueueBuilder.durable(RabbitDeadQueueConstant.DL_QUEUQ).withArguments(args).build();}/*** 定義死信隊列轉發隊列.** @return the queue*/@Beanpublic Queue redirectQueue() {logger.info("創建redirectQueue成功");return QueueBuilder.durable(RabbitDeadQueueConstant.REDIRECT_QUEUE).build();}/*** 死信路由通過 DL_KEY 綁定鍵綁定到死信隊列上.* @return the binding*/@Beanpublic Binding deadLetterBinding() {logger.info("綁定deadLetterQueue到deadLetterExchange成功");return new Binding(RabbitDeadQueueConstant.DL_QUEUQ, Binding.DestinationType.QUEUE,RabbitDeadQueueConstant.DL_EXCHANGE, RabbitDeadQueueConstant.DL_QUEUQ_KEY, null);}/*** 死信路由通過 KEY_R 綁定鍵綁定到最終處理隊列上.* @return the binding*/@Beanpublic Binding redirectBinding() {logger.info("綁定redirectQueue到deadLetterExchange成功");return new Binding(RabbitDeadQueueConstant.REDIRECT_QUEUE, Binding.DestinationType.QUEUE,RabbitDeadQueueConstant.DL_EXCHANGE, RabbitDeadQueueConstant.REDIRECT_QUEUE_KEY, null);}}?
3、編寫消息生產者DeadQueueSender?
package com.lvgang.springbootrabbitmq.deadqueue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessagePostProcessor; import org.springframework.amqp.core.MessageProperties; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component;import java.util.Date; import java.util.UUID;/*** @author lvgang*/ @Component public class DeadQueueSender implements RabbitTemplate.ConfirmCallback ,RabbitTemplate.ReturnCallback {private static Logger logger = LoggerFactory.getLogger(DeadQueueReceiver.class);@Autowiredprivate RabbitTemplate rabbitTemplate;public void send() {//設置回調對象this.rabbitTemplate.setConfirmCallback(this);this.rabbitTemplate.setReturnCallback(this);CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());String content = "DeadQueue= " + new Date() + ", content= " + UUID.randomUUID().toString();MessagePostProcessor messagePostProcessor = message -> {MessageProperties messageProperties = message.getMessageProperties(); // 設置編碼messageProperties.setContentEncoding("utf-8"); // 設置過期時間10*1000毫秒messageProperties.setExpiration("10000");return message;}; // 向DL_QUEUE 發送消息 10*1000毫秒后過期 形成死信rabbitTemplate.convertAndSend(RabbitDeadQueueConstant.DL_EXCHANGE, RabbitDeadQueueConstant.DL_QUEUQ_KEY, content, messagePostProcessor, correlationData);logger.info("Send ok,"+new Date()+","+content);}/*** 消息回調確認方法* 如果消息沒有到exchange,則confirm回調,ack=false* 如果消息到達exchange,則confirm回調,ack=true* @param*/@Overridepublic void confirm(CorrelationData correlationData, boolean isSendSuccess, String s) {//logger.info("confirm--message:回調消息ID為: " + correlationData.getId());if (isSendSuccess) {//logger.info("confirm--message:消息發送成功");} else {logger.info("confirm--message:消息發送失敗" + s);}}/*** exchange到queue成功,則不回調return* exchange到queue失敗,則回調return(需設置mandatory=true,否則不回回調,消息就丟了)*/@Overridepublic void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {logger.info("return--message:" + new String(message.getBody()) + ",replyCode:" + replyCode+ ",replyText:" + replyText + ",exchange:" + exchange + ",routingKey:" + routingKey);} }4、編寫消息消費者? DeadQueueReceiver?
package com.lvgang.springbootrabbitmq.deadqueue;import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;import com.rabbitmq.client.Channel; import java.util.Date;/*** @author lvgang*/ @Component @RabbitListener(queues = RabbitDeadQueueConstant.REDIRECT_QUEUE) public class DeadQueueReceiver {private static Logger logger = LoggerFactory.getLogger(DeadQueueReceiver.class);@RabbitHandlerpublic void process(String hello, Message message, Channel channel) {try {//告訴服務器收到這條消息 已經被我消費了 可以在隊列刪掉 這樣以后就不會再發了 否則消息服務器以為這條消息沒處理掉 后續還會在發channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);logger.info("消息消費成功!");} catch (Exception e) {logger.error("消息消費失敗:"+e.getMessage(),e);//丟棄這條消息//channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,false);}logger.info("Receiver : " + hello +","+ new Date());} }?
二、測試結果
1、編寫測試類TopicTests?
package com.lvgang.springbootrabbitmq.deadqueue;import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; /*** @author lvgang*/ @RunWith(SpringJUnit4ClassRunner.class) @SpringBootTest public class DeadQueueTests {@Autowiredprivate DeadQueueSender deadQueueSender;@Testpublic void hello() {int i=1;while (true) {try {if(i==1) {deadQueueSender.send();}i++;Thread.sleep(1000);} catch (Exception e) {;}}} }?
2、執行測試類,并查看結果
通過執行測試類,查看到了消息消費的情況,生產者共計生產了1個消息,被消費者消費了一次,但發送消息時間及實際消費時間差10秒鐘。
轉載于:https://my.oschina.net/sdlvzg/blog/3045834
總結
以上是生活随笔為你收集整理的SpringBoot RabbitMQ 集成 七 延迟队列的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: java B2B2C Springclo
- 下一篇: EXPLAIN说明