RabbitMQ 延迟队列详解
一、延遲隊列概念
延遲隊列存儲的對象是對應(yīng)的延遲消息,所謂“延遲消息”是指當消息被發(fā)送以后,并不想讓消費者立刻拿到消息,而是等待特定時間后,消費者才能拿到這個消息進行消費。
二、延遲隊列使用場景
1、訂單在十分鐘之內(nèi)未支付則自動取消
2、新創(chuàng)建的店鋪,如果在十天內(nèi)都沒有上傳過商品,則自動發(fā)送消息提醒。
3、用戶注冊成功后,如果三天內(nèi)沒有登錄則進行短信提醒。
4、用戶發(fā)起退款后,如果三天內(nèi)沒有得到處理則通知相關(guān)運營人員。
5、預(yù)定會議后,需要在預(yù)定時間點前十分鐘通知各個與會人員參加會議。
這些場景都有一個特點,需要在某個時間發(fā)生之后或者之前的指定時間點完成某一項任務(wù),如:發(fā)生訂單生成事件,在十分鐘之后檢查該訂單支付狀態(tài),然后將未支付的訂單進行關(guān)閉。看起來似乎使用定時任務(wù),一直輪詢數(shù)據(jù),每秒查一次,然后取出需要被處理的數(shù)據(jù)進行處理就可以了。如果數(shù)據(jù)量比較少,確實可以這樣做,比如:對于“如果賬單一周內(nèi)未支付則進行自動結(jié)算”這樣的需求,如果對于時間不是嚴格限制,而是寬松意義上的一周,那么每天晚上跑個定時任務(wù)檢查一下所有未支付的賬單,確實也是一個可行的方案。但對于數(shù)據(jù)量比較大,并且時效性較強的場景,如:“訂單十分鐘內(nèi)未支付則關(guān)閉”,短期內(nèi)未支付的訂單數(shù)據(jù)可能會很多,活動期間甚至?xí)_到百萬甚至千萬級別,對這么龐大的數(shù)據(jù)量仍舊使用輪詢的方式顯然是不可取的,很可能在一秒內(nèi)無法完成所有訂單的檢查,同時會給數(shù)據(jù)庫帶來很大壓力,無法滿足業(yè)務(wù)要求而且性能低下。
三、RabbitMQ 中的 TTL
TTL 是 RabbitMQ 中一個消息或者隊列的屬性,表明一條消息或者該隊列中的所有消息的最大存活時間,單位是毫秒。目前有兩種方法可以設(shè)置消息的 TTL。第一種方法是通過隊列屬性設(shè)置,隊列中所有消息都有相同的過期時間。第二種方法是對消息本身進行單獨設(shè)置,每條消息的 TTL 可以不同。如果兩種方法一起使用,則消息的 TTL 以兩者之間較小的那個數(shù)值為準。消息在隊列中的生存時間一旦超過設(shè)置的 TTL 值時,就會變成“死信”。
3.1 消息設(shè)置 TTL
針對每條消息設(shè)置 TTL 的方法時在 channel.basicPublish 方法中加入 expiration 的屬性參數(shù),單位為毫秒。
代碼示例:
// 設(shè)置消息 TTL 過期時間為 10s AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build(); String message = "info"; channel.basicPublish(EXCHANGE_NAME, "zhangsan", properties, message.getBytes());3.2 設(shè)置隊列 TTL
通過 channel.queueDeclare 方法中的 x-expires 參數(shù)可以控制隊列被自動刪除前處于未使用狀態(tài)的時間。未使用的意思是隊列上沒有任何的消費者,隊列也沒有被重新聲明,并且在過期時間段內(nèi)也未調(diào)用過 Basic.get 命令。
RabbitMQ 會確保在過期時間到達后將隊列刪除,但是不保障刪除的動作有多及時。在 RabbitMQ 重啟后,持久化的隊列的過期時間會被重新計算。
用于表示過期時間的 x-expires 參數(shù)以毫秒為單位,并且服從和 x-message-ttl 一樣的約束條件,不過不能設(shè)置為 0。比如該參數(shù)設(shè)置為 1000,則表示該隊列如果在 1 秒鐘之內(nèi)未使用則會被刪除。
代碼示例:
Map<String,Object> args = new HashMap<>(); args.put("x-expires", 1800000); channel.queueDeclare("myqueue",false, false, false, args);3.3 兩者的區(qū)別
如果設(shè)置了隊列的 TTL 屬性,那么一旦消息過期,就會被隊列丟棄(如果配置了死信隊列則會被丟到死信隊列中),而第二種方式,消息即使過期,也不一定會被馬上丟棄,因為消息是否過期是在即將投遞到消費者之前判定的,如果當前隊列有嚴重的消息積壓情況,則已過期的消息也許還能存活較長時間;另外,還需要注意一點是,如果不設(shè)置 TTL,表示消息永遠不會過期,如果將 TTL 設(shè)置為 0,則表示除非此時可以直接投遞該消息到消費者,否則該消息將會別丟棄。
四、SpringBoot 整合 RabbitMQ
4.1 添加依賴
<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-rabbit-test</artifactId><scope>test</scope></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.80</version></dependency> </dependencies>4.2 修改配置文件
spring:rabbitmq:host: IP地址username: adminpassword: adminport: 5672virtual-host: /test五、隊列 TTL
5.1 代碼框架圖
創(chuàng)建兩個隊列 QA 和 QB,兩個隊列 TTL 分別設(shè)置為 10s 和 40s,然后再創(chuàng)建一個交換機 X 和死信交換機 Y,它們的類型都是 direct,創(chuàng)建一個死信隊列 QD,它們的綁定關(guān)系如下:
5.2 配置文件類代碼
@Configuration public class TtlQueueConfig {/*** 普通交換機名稱*/public static final String X_EXCHANGE = "X";/*** 死信交換機名稱*/public static final String Y_DEAD_LETTER_EXCHANGE = "Y";/*** 普通隊列名稱*/public static final String QUEUE_A = "QA";public static final String QUEUE_B = "QB";/*** 死信隊列名稱*/public static final String DEAD_LETTER_QUEUE = "QD";/*** 聲明 XExchange*/@Beanpublic DirectExchange xExchange(){return new DirectExchange(X_EXCHANGE);}/*** 聲明 yExchange*/@Beanpublic DirectExchange yExchange(){return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);}/*** 聲明隊列QA*/@Beanpublic Queue queueA(){Map<String, Object> arguments = new HashMap<>(3);// 設(shè)置死信交換機arguments.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);// 設(shè)置死信路由鍵arguments.put("x-dead-letter-routing-key", "YD");// 設(shè)置過期時間arguments.put("x-message-ttl", 10000);return new Queue(QUEUE_A, true, false, false, arguments);}/*** 聲明隊列QB*/@Beanpublic Queue queueB(){Map<String, Object> arguments = new HashMap<>(3);// 設(shè)置死信交換機arguments.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);// 設(shè)置死信路由鍵arguments.put("x-dead-letter-routing-key", "YD");// 設(shè)置過期時間arguments.put("x-message-ttl", 40000);return QueueBuilder.durable(QUEUE_B).withArguments(arguments).build();}/*** 死信隊列QD*/@Beanpublic Queue queueD(){return QueueBuilder.durable(DEAD_LETTER_QUEUE).build();}/*** 綁定*/@Beanpublic Binding queueABindingX(@Qualifier("queueA") Queue queueA,@Qualifier("xExchange") DirectExchange xExchange){return BindingBuilder.bind(queueA).to(xExchange).with("XA");}@Beanpublic Binding queueBBindingX(){return new Binding(QUEUE_B, Binding.DestinationType.QUEUE, X_EXCHANGE, "XB", null);}@Beanpublic Binding queueDBindingY(@Qualifier("queueD") Queue queueD,@Qualifier("yExchange") DirectExchange yExchange){return BindingBuilder.bind(queueD).to(yExchange).with("YD");} }5.3 消息生產(chǎn)者代碼
@Slf4j @RestController @RequestMapping("/ttl") public class SendMsgController {@Autowiredprivate RabbitTemplate rabbitTemplate;@GetMapping("/sendMsg/{message}")public String sendMsg(@PathVariable String message){log.info("當前時間:{}發(fā)送一條消息{}給兩個隊列", new Date(), message);rabbitTemplate.convertAndSend("X", "XA", "消息來自TTL為10s隊列QA:"+message);rabbitTemplate.convertAndSend("X", "XB", "消息來自TTL為40s隊列QB:"+message);return "發(fā)送成功";} }5.4 消息消費者代碼
@Slf4j @Component public class DeadLetterConsumer {@RabbitListener(queues = "QD")public void receiveD(Message message){String msg = new String(message.getBody());log.info("當前時間{},收到死信隊列的消息:{}", new Date(), msg);} }發(fā)送一個請求 http://localhost:8080/ttl/sendMsg/嘻嘻嘻
第一條消息在 10s 后變成了死信消息,然后被消費者消費掉了,第二條消息在 40s 之后變成了死信消息,然后被消費掉,這樣一個延時隊列就完成了。
不過,如果這樣使用的話,豈不是每增加一個新的時間需求,就要新增一個隊列,這里只有 10s 和 40s 兩個時間選項,如果需要一個小時后處理,那么就需要增加 TTL 為一個小時的隊列,如果是預(yù)定會議室,然后提前通知這樣的場景,豈不是要增加無數(shù)個隊列才能滿足需求?
六、延時隊列優(yōu)化
6.1 代碼架構(gòu)圖
在這里新增了一個隊列 QC,綁定關(guān)系如下,該隊列不設(shè)置 TTL 時間
6.2 配置類文件代碼
@Component public class MsgTtlQueueConfig {private static final String Y_DEAD_LETTER_EXCHANGE = "Y";private static final String QUEUE_C = "QC";@Bean("queueC")public Queue queueC(){Map<String, Object> arguments = new HashMap<>(2);// 聲明當前隊列綁定的死信交換機arguments.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);// 聲明當前隊列的私信路由keyarguments.put("x-dead-letter-routing-key", "YD");return new Queue(QUEUE_C, false, false, false, arguments);}@Beanpublic Binding queueCBindingX(@Qualifier("queueC") Queue queueC,@Qualifier("xExchange") DirectExchange xExchange){return BindingBuilder.bind(queueC).to(xExchange).with("XC");} }6.3 消息生產(chǎn)者代碼
@GetMapping("/sendMsg/{message}/{ttlTime}") public String sendMsg(@PathVariable String message, @PathVariable String ttlTime){MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {message.getMessageProperties().setExpiration(ttlTime);return message;}};rabbitTemplate.convertAndSend("X", "XC", message, messagePostProcessor);return "發(fā)送成功"; }可以改為 Lambda 表達式:
@GetMapping("/sendExpirationMsg/{message}/{ttlTime}")public String sendMsg(@PathVariable String message, @PathVariable String ttlTime){rabbitTemplate.convertAndSend("X", "XC", message, (messagePostProcessor) -> {messagePostProcessor.getMessageProperties().setExpiration(ttlTime);return messagePostProcessor;});return "發(fā)送成功";}MessagePostProcessor 是一個函數(shù)式接口,通常使用 lambda 表達式來實現(xiàn):
amqpTemplate.convertAndSend(routingKey, m -> {m.getMessageProperties().setDeliveryMode(DeliveryMode.NON_PERSISTENT);return m; });該接口會在框架中的幾個地方使用,例如 AmqpTemplateconvertAndSend(Object, MessagePostProcessor) ,它可用于在執(zhí)行消息轉(zhuǎn)換后添加或者修改標頭或?qū)傩浴K€可用于在監(jiān)聽器容器和 AmqpTemplate 中接收消息時修改入站消息。
@FunctionalInterface public interface MessagePostProcessor {/*** 用于修改或替換消息*/Message postProcessMessage(Message message) throws AmqpException;/*** 用于修改或替換消息,也可修改消息的相關(guān)數(shù)據(jù)。僅適用于出站消息*/default Message postProcessMessage(Message message, Correlation correlation) {return postProcessMessage(message);}/*** 用于修改或替換消息,也可修改消息的相關(guān)數(shù)據(jù)。僅適用于出站消息*/default Message postProcessMessage(Message message, Correlation correlation, String exchange, String routingKey) {return postProcessMessage(message, correlation);}}將程序執(zhí)行,然后發(fā)送請求:
http://localhost:8080/ttl/sendExpirationMsg/你好 1/20000
http://localhost:8080/ttl/sendExpirationMsg/你好 2/2000
兩條消息的過期時間一致,過期時間短的那條消息,在過期時間到了以后并沒有立即被消費,而是和過期時間長的那條消息一起被消費了。所以,如果使用在消息屬性上設(shè)置 TTL 的方式,消息可能并不會按時“死亡”,因為 RabbitMQ 只會檢查第一個消息是否過期,如果過期則丟到死信隊列,如果第一個消息的延時時長很長,而第二個消息的延時時長很短,第二個消息并不會優(yōu)先得到執(zhí)行。
七、Rabbitmq 插件實現(xiàn)延遲隊列
上面提到的問題,確實是一個問題,如果不能實現(xiàn)在消息粒度上的 TTL,并使其在設(shè)置的 TTL 時間及時死亡,就無法設(shè)計成一個通用的延時隊列。
7.1 Docker 安裝延時隊列插件
安裝教程可以參照這位大佬的文章:docker 安裝 rabbitmq并添加延遲隊列插件
注意:我在安裝完成延時隊列插件后無法登錄 RabbitMQ 的后臺管理系統(tǒng),一直提示不是私密連接,懷疑是安裝插件后,賬戶被清除了,重新創(chuàng)建一下就可以了。
進入docker:docker exec -it rabbitmq /bin/bash添加賬號:rabbitmqctl add_user admin admin設(shè)置角色:rabbitmqctl set_user_tags admin administrator設(shè)置權(quán)限:rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"登錄后就可以看到了:
7.2 代碼架構(gòu)圖
在這里新增了一個隊列 delayed.queue,一個自定義交換機 delayed.exchange,綁定關(guān)系如下:
7.3 配置文件類代碼
在我們自定義的交換機中,這是一種新的交換機類型,該類型消息支持延遲投遞機制,消息傳遞后并不會立即投遞到目標隊列中,而是存儲在 mnesia(一個分布式數(shù)據(jù)系統(tǒng))表中,當達到投遞時間時,才投遞到目標隊列中。
@Configuration public class DelayedQueueConfig {public static final String DELAYED_QUEUE_NAME = "delayed.queue";public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";public static final String DELAYED_ROUTING_KEY = "delayed.routingKey";@Bean("delayedQueue")public Queue delayedQueue(){return new Queue(DELAYED_QUEUE_NAME);}/*** 自定義交換機 定義一個延遲交換機* 不需要死信交換機和死信隊列,支持消息延遲投遞,消息投遞之后沒有到達投遞時間,是不會投遞給隊列* 而是存儲在一個分布式表,當投遞時間到達,才會投遞到目標隊列* @return*/@Bean("delayedExchange")public CustomExchange delayedExchange(){Map<String, Object> args = new HashMap<>(1);// 自定義交換機的類型args.put("x-delayed-type", "direct");return new CustomExchange(DELAYED_EXCHANGE_NAME, "x-delayed-message", true, false, args);}@Beanpublic Binding bindingDelayedQueue(@Qualifier("delayedQueue") Queue delayedQueue,@Qualifier("delayedExchange") CustomExchange delayedExchange){return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();} }7.4 消息生產(chǎn)者代碼
@Slf4j @RestController @RequestMapping("/ttl") public class SendMsgController {@Autowiredprivate RabbitTemplate rabbitTemplate;public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";public static final String DELAYED_ROUTING_KEY = "delayed.routingKey";@GetMapping("/sendDelayMsg/{message}/{delayTime}")public String sendMsg(@PathVariable String message, @PathVariable Integer delayTime){rabbitTemplate.convertAndSend(DELAYED_EXCHANGE_NAME, DELAYED_ROUTING_KEY, message, messagePostProcessor ->{messagePostProcessor.getMessageProperties().setDelay(delayTime);return messagePostProcessor;});log.info("當前時間:{},發(fā)送一條延遲{}毫秒的信息給隊列delay.queue:{}", new Date(), delayTime, message);return "發(fā)送成功";} }7.5 消息消費者代碼
@Slf4j @Component public class DeadLetterConsumer {public static final String DELAYED_QUEUE_NAME = "delayed.queue";@RabbitListener(queues = DELAYED_QUEUE_NAME)public void receiveDelayedQueue(Message message){String msg = new String(message.getBody());log.info("當前時間:{},收到延時隊列的消息:{}", new Date(), msg);} }發(fā)起請求:
http://localhost:8080/ttl/sendDelayMsg/come on baby1/20000
http://localhost:8080/ttl/sendDelayMsg/come on baby2/2000
第二條消息被先消費掉了,符合預(yù)期
八、總結(jié)
延時隊列在需要延時處理的場景下非常有用,使用 RabbitMQ 來實現(xiàn)延時隊列可以很好地利用 RabbitMQ 的特性,如:消息可靠發(fā)送、消息可靠投遞、死信隊列,來保證消息至少被消費一次以及未被正確處理的消息不會被丟棄。另外,通過 RabbitMQ 集群的特性,可以很好要的解決單點故障問題,不會因為單個節(jié)點掛掉導(dǎo)致延時隊列不可用或者消息丟失。
總結(jié)
以上是生活随笔為你收集整理的RabbitMQ 延迟队列详解的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 傻瓜函数式编程
- 下一篇: 开题:轴承的剩余寿命预测(为什么要长时间