【RabbitMQ】一文带你搞定RabbitMQ延迟队列
本文口味:魚香肉絲? ?預(yù)計閱讀:10分鐘
0|1一、說明
在上一篇中,介紹了RabbitMQ中的死信隊列是什么,何時使用以及如何使用RabbitMQ的死信隊列。相信通過上一篇的學(xué)習(xí),對于死信隊列已經(jīng)有了更多的了解,這一篇的內(nèi)容也跟死信隊列息息相關(guān),如果你還不了解死信隊列,那么建議你先進(jìn)行上一篇文章的閱讀。
這一篇里,我們將繼續(xù)介紹RabbitMQ的高級特性,通過本篇的學(xué)習(xí),你將收獲:
0|1二、本文大綱
以下是本文大綱:
本文閱讀前,需要對RabbitMQ以及死信隊列有一個簡單的了解。
0|1三、什么是延時隊列
延時隊列,首先,它是一種隊列,隊列意味著內(nèi)部的元素是有序的,元素出隊和入隊是有方向性的,元素從一端進(jìn)入,從另一端取出。
其次,延時隊列,最重要的特性就體現(xiàn)在它的延時屬性上,跟普通的隊列不一樣的是,普通隊列中的元素總是等著希望被早點(diǎn)取出處理,而延時隊列中的元素則是希望被在指定時間得到取出和處理,所以延時隊列中的元素是都是帶時間屬性的,通常來說是需要被處理的消息或者任務(wù)。
簡單來說,延時隊列就是用來存放需要在指定時間被處理的元素的隊列。
0|1四、延時隊列使用場景
那么什么時候需要用延時隊列呢?考慮一下以下場景:
這些場景都有一個特點(diǎn),需要在某個事件發(fā)生之后或者之前的指定時間點(diǎn)完成某一項(xiàng)任務(wù),如:發(fā)生訂單生成事件,在十分鐘之后檢查該訂單支付狀態(tài),然后將未支付的訂單進(jìn)行關(guān)閉;發(fā)生店鋪創(chuàng)建事件,十天后檢查該店鋪上新商品數(shù),然后通知上新數(shù)為0的商戶;發(fā)生賬單生成事件,檢查賬單支付狀態(tài),然后自動結(jié)算未支付的賬單;發(fā)生新用戶注冊事件,三天后檢查新注冊用戶的活動數(shù)據(jù),然后通知沒有任何活動記錄的用戶;發(fā)生退款事件,在三天之后檢查該訂單是否已被處理,如仍未被處理,則發(fā)送消息給相關(guān)運(yùn)營人員;發(fā)生預(yù)定會議事件,判斷離會議開始是否只有十分鐘了,如果是,則通知各個與會人員。
看起來似乎使用定時任務(wù),一直輪詢數(shù)據(jù),每秒查一次,取出需要被處理的數(shù)據(jù),然后處理不就完事了嗎?如果數(shù)據(jù)量比較少,確實(shí)可以這樣做,比如:對于“如果賬單一周內(nèi)未支付則進(jìn)行自動結(jié)算”這樣的需求,如果對于時間不是嚴(yán)格限制,而是寬松意義上的一周,那么每天晚上跑個定時任務(wù)檢查一下所有未支付的賬單,確實(shí)也是一個可行的方案。但對于數(shù)據(jù)量比較大,并且時效性較強(qiáng)的場景,如:“訂單十分鐘內(nèi)未支付則關(guān)閉“,短期內(nèi)未支付的訂單數(shù)據(jù)可能會有很多,活動期間甚至?xí)_(dá)到百萬甚至千萬級別,對這么龐大的數(shù)據(jù)量仍舊使用輪詢的方式顯然是不可取的,很可能在一秒內(nèi)無法完成所有訂單的檢查,同時會給數(shù)據(jù)庫帶來很大壓力,無法滿足業(yè)務(wù)要求而且性能低下。
更重要的一點(diǎn)是,不!優(yōu)!雅!
沒錯,作為一名有追求的程序員,始終應(yīng)該追求更優(yōu)雅的架構(gòu)和更優(yōu)雅的代碼風(fēng)格,寫代碼要像寫詩一樣優(yōu)美。【滑稽】
這時候,延時隊列就可以閃亮登場了,以上場景,正是延時隊列的用武之地。
既然延時隊列可以解決很多特定場景下,帶時間屬性的任務(wù)需求,那么如何構(gòu)造一個延時隊列呢?接下來,本文將介紹如何用RabbitMQ來實(shí)現(xiàn)延時隊列。
0|1五、RabbitMQ中的TTL
在介紹延時隊列之前,還需要先介紹一下RabbitMQ中的一個高級特性——TTL(Time To Live)。
TTL是什么呢?TTL是RabbitMQ中一個消息或者隊列的屬性,表明一條消息或者該隊列中的所有消息的最大存活時間,單位是毫秒。換句話說,如果一條消息設(shè)置了TTL屬性或者進(jìn)入了設(shè)置TTL屬性的隊列,那么這條消息如果在TTL設(shè)置的時間內(nèi)沒有被消費(fèi),則會成為“死信”(至于什么是死信,請翻看上一篇)。如果同時配置了隊列的TTL和消息的TTL,那么較小的那個值將會被使用。
那么,如何設(shè)置這個TTL值呢?有兩種方式,第一種是在創(chuàng)建隊列的時候設(shè)置隊列的“x-message-ttl”屬性,如下:
Map<String, Object> args = new HashMap<String, Object>(); args.put("x-message-ttl", 6000); channel.queueDeclare(queueName, durable, exclusive, autoDelete, args);
這樣所有被投遞到該隊列的消息都最多不會存活超過6s。
另一種方式便是針對每條消息設(shè)置TTL,代碼如下:
AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder(); builder.expiration("6000"); AMQP.BasicProperties properties = builder.build(); channel.basicPublish(exchangeName, routingKey, mandatory, properties, "msg body".getBytes());
這樣這條消息的過期時間也被設(shè)置成了6s。
但這兩種方式是有區(qū)別的,如果設(shè)置了隊列的TTL屬性,那么一旦消息過期,就會被隊列丟棄,而第二種方式,消息即使過期,也不一定會被馬上丟棄,因?yàn)橄⑹欠襁^期是在即將投遞到消費(fèi)者之前判定的,如果當(dāng)前隊列有嚴(yán)重的消息積壓情況,則已過期的消息也許還能存活較長時間。
另外,還需要注意的一點(diǎn)是,如果不設(shè)置TTL,表示消息永遠(yuǎn)不會過期,如果將TTL設(shè)置為0,則表示除非此時可以直接投遞該消息到消費(fèi)者,否則該消息將會被丟棄。
0|1六、如何利用RabbitMQ實(shí)現(xiàn)延時隊列
前一篇里介紹了如果設(shè)置死信隊列,前文中又介紹了TTL,至此,利用RabbitMQ實(shí)現(xiàn)延時隊列的兩大要素已經(jīng)集齊,接下來只需要將它們進(jìn)行調(diào)和,再加入一點(diǎn)點(diǎn)調(diào)味料,延時隊列就可以新鮮出爐了。
想想看,延時隊列,不就是想要消息延遲多久被處理嗎,TTL則剛好能讓消息在延遲多久之后成為死信,另一方面,成為死信的消息都會被投遞到死信隊列里,這樣只需要消費(fèi)者一直消費(fèi)死信隊列里的消息就萬事大吉了,因?yàn)槔锩娴南⒍际窍M涣⒓刺幚淼南ⅰ?/p>
從下圖可以大致看出消息的流向:
生產(chǎn)者生產(chǎn)一條延時消息,根據(jù)需要延時時間的不同,利用不同的routingkey將消息路由到不同的延時隊列,每個隊列都設(shè)置了不同的TTL屬性,并綁定在同一個死信交換機(jī)中,消息過期后,根據(jù)routingkey的不同,又會被路由到不同的死信隊列中,消費(fèi)者只需要監(jiān)聽對應(yīng)的死信隊列進(jìn)行處理即可。
下面來看代碼:
先聲明交換機(jī)、隊列以及他們的綁定關(guān)系:
@Configuration public class RabbitMQConfig { public static final String DELAY_EXCHANGE_NAME = "delay.queue.demo.business.exchange"; public static final String DELAY_QUEUEA_NAME = "delay.queue.demo.business.queuea"; public static final String DELAY_QUEUEB_NAME = "delay.queue.demo.business.queueb"; public static final String DELAY_QUEUEA_ROUTING_KEY = "delay.queue.demo.business.queuea.routingkey"; public static final String DELAY_QUEUEB_ROUTING_KEY = "delay.queue.demo.business.queueb.routingkey"; public static final String DEAD_LETTER_EXCHANGE = "delay.queue.demo.deadletter.exchange"; public static final String DEAD_LETTER_QUEUEA_ROUTING_KEY = "delay.queue.demo.deadletter.delay_10s.routingkey"; public static final String DEAD_LETTER_QUEUEB_ROUTING_KEY = "delay.queue.demo.deadletter.delay_60s.routingkey"; public static final String DEAD_LETTER_QUEUEA_NAME = "delay.queue.demo.deadletter.queuea"; public static final String DEAD_LETTER_QUEUEB_NAME = "delay.queue.demo.deadletter.queueb"; // 聲明延時Exchange @Bean("delayExchange") public DirectExchange delayExchange(){ return new DirectExchange(DELAY_EXCHANGE_NAME); } // 聲明死信Exchange @Bean("deadLetterExchange") public DirectExchange deadLetterExchange(){ return new DirectExchange(DEAD_LETTER_EXCHANGE); } // 聲明延時隊列A 延時10s // 并綁定到對應(yīng)的死信交換機(jī) @Bean("delayQueueA") public Queue delayQueueA(){ Map<String, Object> args = new HashMap<>(2); // x-dead-letter-exchange 這里聲明當(dāng)前隊列綁定的死信交換機(jī) args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE); // x-dead-letter-routing-key 這里聲明當(dāng)前隊列的死信路由key args.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUEA_ROUTING_KEY); // x-message-ttl 聲明隊列的TTL args.put("x-message-ttl", 6000); return QueueBuilder.durable(DELAY_QUEUEA_NAME).withArguments(args).build(); } // 聲明延時隊列B 延時 60s // 并綁定到對應(yīng)的死信交換機(jī) @Bean("delayQueueB") public Queue delayQueueB(){ Map<String, Object> args = new HashMap<>(2); // x-dead-letter-exchange 這里聲明當(dāng)前隊列綁定的死信交換機(jī) args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE); // x-dead-letter-routing-key 這里聲明當(dāng)前隊列的死信路由key args.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUEB_ROUTING_KEY); // x-message-ttl 聲明隊列的TTL args.put("x-message-ttl", 60000); return QueueBuilder.durable(DELAY_QUEUEB_NAME).withArguments(args).build(); } // 聲明死信隊列A 用于接收延時10s處理的消息 @Bean("deadLetterQueueA") public Queue deadLetterQueueA(){ return new Queue(DEAD_LETTER_QUEUEA_NAME); } // 聲明死信隊列B 用于接收延時60s處理的消息 @Bean("deadLetterQueueB") public Queue deadLetterQueueB(){ return new Queue(DEAD_LETTER_QUEUEB_NAME); } // 聲明延時隊列A綁定關(guān)系 @Bean public Binding delayBindingA(@Qualifier("delayQueueA") Queue queue, @Qualifier("delayExchange") DirectExchange exchange){ return BindingBuilder.bind(queue).to(exchange).with(DELAY_QUEUEA_ROUTING_KEY); } // 聲明業(yè)務(wù)隊列B綁定關(guān)系 @Bean public Binding delayBindingB(@Qualifier("delayQueueB") Queue queue, @Qualifier("delayExchange") DirectExchange exchange){ return BindingBuilder.bind(queue).to(exchange).with(DELAY_QUEUEB_ROUTING_KEY); } // 聲明死信隊列A綁定關(guān)系 @Bean public Binding deadLetterBindingA(@Qualifier("deadLetterQueueA") Queue queue, @Qualifier("deadLetterExchange") DirectExchange exchange){ return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUEA_ROUTING_KEY); } // 聲明死信隊列B綁定關(guān)系 @Bean public Binding deadLetterBindingB(@Qualifier("deadLetterQueueB") Queue queue, @Qualifier("deadLetterExchange") DirectExchange exchange){ return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUEB_ROUTING_KEY); } }
接下來,創(chuàng)建兩個消費(fèi)者,分別對兩個死信隊列的消息進(jìn)行消費(fèi):
@Slf4j @Component public class DeadLetterQueueConsumer { @RabbitListener(queues = DEAD_LETTER_QUEUEA_NAME) public void receiveA(Message message, Channel channel) throws IOException { String msg = new String(message.getBody()); log.info("當(dāng)前時間:{},死信隊列A收到消息:{}", new Date().toString(), msg); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } @RabbitListener(queues = DEAD_LETTER_QUEUEB_NAME) public void receiveB(Message message, Channel channel) throws IOException { String msg = new String(message.getBody()); log.info("當(dāng)前時間:{},死信隊列B收到消息:{}", new Date().toString(), msg); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } }
然后是消息的生產(chǎn)者:
@Component public class DelayMessageSender { @Autowired private RabbitTemplate rabbitTemplate; public void sendMsg(String msg, DelayTypeEnum type){ switch (type){ case DELAY_10s: rabbitTemplate.convertAndSend(DELAY_EXCHANGE_NAME, DELAY_QUEUEA_ROUTING_KEY, msg); break; case DELAY_60s: rabbitTemplate.convertAndSend(DELAY_EXCHANGE_NAME, DELAY_QUEUEB_ROUTING_KEY, msg); break; } } }
接下來,我們暴露一個web接口來生產(chǎn)消息:
@Slf4j @RequestMapping("rabbitmq") @RestController public class RabbitMQMsgController { @Autowired private DelayMessageSender sender; @RequestMapping("sendmsg") public void sendMsg(String msg, Integer delayType){ log.info("當(dāng)前時間:{},收到請求,msg:{},delayType:{}", new Date(), msg, delayType); sender.sendMsg(msg, Objects.requireNonNull(DelayTypeEnum.getDelayTypeEnumByValue(delayType))); } }
準(zhǔn)備就緒,啟動!
打開rabbitMQ的管理后臺,可以看到我們剛才創(chuàng)建的交換機(jī)和隊列信息:
接下來,我們來發(fā)送幾條消息,http://localhost:8080/rabbitmq/sendmsg?msg=testMsg1&delayType=1?http://localhost:8080/rabbitmq/sendmsg?msg=testMsg2&delayType=2
日志如下:
2019-07-28 16:02:19.813 INFO 3860 --- [nio-8080-exec-9] c.m.d.controller.RabbitMQMsgController : 當(dāng)前時間:Sun Jul 28 16:02:19 CST 2019,收到請求,msg:testMsg1,delayType:1 2019-07-28 16:02:19.815 INFO 3860 --- [nio-8080-exec-9] .l.DirectReplyToMessageListenerContainer : SimpleConsumer [queue=amq.rabbitmq.reply-to, consumerTag=amq.ctag-o-qPpkWIkRm73DIrOIVhig identity=766339] started 2019-07-28 16:02:25.829 INFO 3860 --- [ntContainer#1-1] c.m.d.mq.DeadLetterQueueConsumer : 當(dāng)前時間:Sun Jul 28 16:02:25 CST 2019,死信隊列A收到消息:testMsg1 2019-07-28 16:02:41.326 INFO 3860 --- [nio-8080-exec-1] c.m.d.controller.RabbitMQMsgController : 當(dāng)前時間:Sun Jul 28 16:02:41 CST 2019,收到請求,msg:testMsg2,delayType:2 2019-07-28 16:03:41.329 INFO 3860 --- [ntContainer#0-1] c.m.d.mq.DeadLetterQueueConsumer : 當(dāng)前時間:Sun Jul 28 16:03:41 CST 2019,死信隊列B收到消息:testMsg2
第一條消息在6s后變成了死信消息,然后被消費(fèi)者消費(fèi)掉,第二條消息在60s之后變成了死信消息,然后被消費(fèi)掉,這樣,一個還算ok的延時隊列就打造完成了。
不過,等等,如果這樣使用的話,豈不是每增加一個新的時間需求,就要新增一個隊列,這里只有6s和60s兩個時間選項(xiàng),如果需要一個小時后處理,那么就需要增加TTL為一個小時的隊列,如果是預(yù)定會議室然后提前通知這樣的場景,豈不是要增加無數(shù)個隊列才能滿足需求??
嗯,仔細(xì)想想,事情并不簡單。
0|1七、RabbitMQ延時隊列優(yōu)化
顯然,需要一種更通用的方案才能滿足需求,那么就只能將TTL設(shè)置在消息屬性里了。我們來試一試。
增加一個延時隊列,用于接收設(shè)置為任意延時時長的消息,增加一個相應(yīng)的死信隊列和routingkey:
@Configuration public class RabbitMQConfig { public static final String DELAY_EXCHANGE_NAME = "delay.queue.demo.business.exchange"; public static final String DELAY_QUEUEC_NAME = "delay.queue.demo.business.queuec"; public static final String DELAY_QUEUEC_ROUTING_KEY = "delay.queue.demo.business.queuec.routingkey"; public static final String DEAD_LETTER_EXCHANGE = "delay.queue.demo.deadletter.exchange"; public static final String DEAD_LETTER_QUEUEC_ROUTING_KEY = "delay.queue.demo.deadletter.delay_anytime.routingkey"; public static final String DEAD_LETTER_QUEUEC_NAME = "delay.queue.demo.deadletter.queuec"; // 聲明延時Exchange @Bean("delayExchange") public DirectExchange delayExchange(){ return new DirectExchange(DELAY_EXCHANGE_NAME); } // 聲明死信Exchange @Bean("deadLetterExchange") public DirectExchange deadLetterExchange(){ return new DirectExchange(DEAD_LETTER_EXCHANGE); } // 聲明延時隊列C 不設(shè)置TTL // 并綁定到對應(yīng)的死信交換機(jī) @Bean("delayQueueC") public Queue delayQueueC(){ Map<String, Object> args = new HashMap<>(3); // x-dead-letter-exchange 這里聲明當(dāng)前隊列綁定的死信交換機(jī) args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE); // x-dead-letter-routing-key 這里聲明當(dāng)前隊列的死信路由key args.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUEC_ROUTING_KEY); return QueueBuilder.durable(DELAY_QUEUEC_NAME).withArguments(args).build(); } // 聲明死信隊列C 用于接收延時任意時長處理的消息 @Bean("deadLetterQueueC") public Queue deadLetterQueueC(){ return new Queue(DEAD_LETTER_QUEUEC_NAME); } // 聲明延時列C綁定關(guān)系 @Bean public Binding delayBindingC(@Qualifier("delayQueueC") Queue queue, @Qualifier("delayExchange") DirectExchange exchange){ return BindingBuilder.bind(queue).to(exchange).with(DELAY_QUEUEC_ROUTING_KEY); } // 聲明死信隊列C綁定關(guān)系 @Bean public Binding deadLetterBindingC(@Qualifier("deadLetterQueueC") Queue queue, @Qualifier("deadLetterExchange") DirectExchange exchange){ return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUEC_ROUTING_KEY); } }
增加一個死信隊列C的消費(fèi)者:
@RabbitListener(queues = DEAD_LETTER_QUEUEC_NAME) public void receiveC(Message message, Channel channel) throws IOException { String msg = new String(message.getBody()); log.info("當(dāng)前時間:{},死信隊列C收到消息:{}", new Date().toString(), msg); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); }
再次啟動!然后訪問:http://localhost:8080/rabbitmq/delayMsg?msg=testMsg1delayTime=5000?來生產(chǎn)消息,注意這里的單位是毫秒。
2019-07-28 16:45:07.033 INFO 31468 --- [nio-8080-exec-4] c.m.d.controller.RabbitMQMsgController : 當(dāng)前時間:Sun Jul 28 16:45:07 CST 2019,收到請求,msg:testMsg1,delayTime:5000 2019-07-28 16:45:11.694 INFO 31468 --- [nio-8080-exec-5] c.m.d.controller.RabbitMQMsgController : 當(dāng)前時間:Sun Jul 28 16:45:11 CST 2019,收到請求,msg:testMsg2,delayTime:5000 2019-07-28 16:45:12.048 INFO 31468 --- [ntContainer#1-1] c.m.d.mq.DeadLetterQueueConsumer : 當(dāng)前時間:Sun Jul 28 16:45:12 CST 2019,死信隊列C收到消息:testMsg1 2019-07-28 16:45:16.709 INFO 31468 --- [ntContainer#1-1] c.m.d.mq.DeadLetterQueueConsumer : 當(dāng)前時間:Sun Jul 28 16:45:16 CST 2019,死信隊列C收到消息:testMsg2
看起來似乎沒什么問題,但不要高興的太早,在最開始的時候,就介紹過,如果使用在消息屬性上設(shè)置TTL的方式,消息可能并不會按時“死亡“,因?yàn)镽abbitMQ只會檢查第一個消息是否過期,如果過期則丟到死信隊列,索引如果第一個消息的延時時長很長,而第二個消息的延時時長很短,則第二個消息并不會優(yōu)先得到執(zhí)行。
實(shí)驗(yàn)一下:
2019-07-28 16:49:02.957 INFO 31468 --- [nio-8080-exec-8] c.m.d.controller.RabbitMQMsgController : 當(dāng)前時間:Sun Jul 28 16:49:02 CST 2019,收到請求,msg:longDelayedMsg,delayTime:20000 2019-07-28 16:49:10.671 INFO 31468 --- [nio-8080-exec-9] c.m.d.controller.RabbitMQMsgController : 當(dāng)前時間:Sun Jul 28 16:49:10 CST 2019,收到請求,msg:shortDelayedMsg,delayTime:2000 2019-07-28 16:49:22.969 INFO 31468 --- [ntContainer#1-1] c.m.d.mq.DeadLetterQueueConsumer : 當(dāng)前時間:Sun Jul 28 16:49:22 CST 2019,死信隊列C收到消息:longDelayedMsg 2019-07-28 16:49:22.970 INFO 31468 --- [ntContainer#1-1] c.m.d.mq.DeadLetterQueueConsumer : 當(dāng)前時間:Sun Jul 28 16:49:22 CST 2019,死信隊列C收到消息:shortDelayedMsg
我們先發(fā)了一個延時時長為20s的消息,然后發(fā)了一個延時時長為2s的消息,結(jié)果顯示,第二個消息會在等第一個消息成為死信后才會“死亡“。
0|1八、利用RabbitMQ插件實(shí)現(xiàn)延遲隊列
上文中提到的問題,確實(shí)是一個硬傷,如果不能實(shí)現(xiàn)在消息粒度上添加TTL,并使其在設(shè)置的TTL時間及時死亡,就無法設(shè)計成一個通用的延時隊列。
那如何解決這個問題呢?不要慌,安裝一個插件即可:Community Plugins — RabbitMQ?,下載rabbitmq_delayed_message_exchange插件,然后解壓放置到RabbitMQ的插件目錄。
接下來,進(jìn)入RabbitMQ的安裝目錄下的sbin目錄,執(zhí)行下面命令讓該插件生效,然后重啟RabbitMQ。
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
然后,我們再聲明幾個Bean:
@Configuration public class DelayedRabbitMQConfig { public static final String DELAYED_QUEUE_NAME = "delay.queue.demo.delay.queue"; public static final String DELAYED_EXCHANGE_NAME = "delay.queue.demo.delay.exchange"; public static final String DELAYED_ROUTING_KEY = "delay.queue.demo.delay.routingkey"; @Bean public Queue immediateQueue() { return new Queue(DELAYED_QUEUE_NAME); } @Bean public CustomExchange customExchange() { Map<String, Object> args = new HashMap<>(); args.put("x-delayed-type", "direct"); return new CustomExchange(DELAYED_EXCHANGE_NAME, "x-delayed-message", true, false, args); } @Bean public Binding bindingNotify(@Qualifier("immediateQueue") Queue queue, @Qualifier("customExchange") CustomExchange customExchange) { return BindingBuilder.bind(queue).to(customExchange).with(DELAYED_ROUTING_KEY).noargs(); } }
controller層再添加一個入口:
@RequestMapping("delayMsg2") public void delayMsg2(String msg, Integer delayTime) { log.info("當(dāng)前時間:{},收到請求,msg:{},delayTime:{}", new Date(), msg, delayTime); sender.sendDelayMsg(msg, delayTime); }
消息生產(chǎn)者的代碼也需要修改:
public void sendDelayMsg(String msg, Integer delayTime) { rabbitTemplate.convertAndSend(DELAYED_EXCHANGE_NAME, DELAYED_ROUTING_KEY, msg, a ->{ a.getMessageProperties().setDelay(delayTime); return a; }); }
最后,再創(chuàng)建一個消費(fèi)者:
@RabbitListener(queues = DELAYED_QUEUE_NAME) public void receiveD(Message message, Channel channel) throws IOException { String msg = new String(message.getBody()); log.info("當(dāng)前時間:{},延時隊列收到消息:{}", new Date().toString(), msg); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); }
一切準(zhǔn)備就緒,啟動!然后分別訪問以下鏈接:
http://localhost:8080/rabbitmq/delayMsg2?msg=msg1&delayTime=20000 http://localhost:8080/rabbitmq/delayMsg2?msg=msg2&delayTime=2000
日志如下:
2019-07-28 17:28:13.729 INFO 25804 --- [nio-8080-exec-2] c.m.d.controller.RabbitMQMsgController : 當(dāng)前時間:Sun Jul 28 17:28:13 CST 2019,收到請求,msg:msg1,delayTime:20000 2019-07-28 17:28:20.607 INFO 25804 --- [nio-8080-exec-1] c.m.d.controller.RabbitMQMsgController : 當(dāng)前時間:Sun Jul 28 17:28:20 CST 2019,收到請求,msg:msg2,delayTime:2000 2019-07-28 17:28:22.624 INFO 25804 --- [ntContainer#1-1] c.m.d.mq.DeadLetterQueueConsumer : 當(dāng)前時間:Sun Jul 28 17:28:22 CST 2019,延時隊列收到消息:msg2 2019-07-28 17:28:33.751 INFO 25804 --- [ntContainer#1-1] c.m.d.mq.DeadLetterQueueConsumer : 當(dāng)前時間:Sun Jul 28 17:28:33 CST 2019,延時隊列收到消息:msg1
第二個消息被先消費(fèi)掉了,符合預(yù)期。至此,RabbitMQ實(shí)現(xiàn)延時隊列的部分就完結(jié)了。
0|1九、總結(jié)
延時隊列在需要延時處理的場景下非常有用,使用RabbitMQ來實(shí)現(xiàn)延時隊列可以很好的利用RabbitMQ的特性,如:消息可靠發(fā)送、消息可靠投遞、死信隊列來保障消息至少被消費(fèi)一次以及未被正確處理的消息不會被丟棄。另外,通過RabbitMQ集群的特性,可以很好的解決單點(diǎn)故障問題,不會因?yàn)閱蝹€節(jié)點(diǎn)掛掉導(dǎo)致延時隊列不可用或者消息丟失。
當(dāng)然,延時隊列還有很多其它選擇,比如利用Java的DelayQueu,利用Redis的zset,利用Quartz或者利用kafka的時間輪,這些方式各有特點(diǎn),但就像爐石傳說一般,這些知識就好比手里的卡牌,知道的越多,可以用的卡牌也就越多,遇到問題便能游刃有余,所以需要大量的知識儲備和經(jīng)驗(yàn)積累才能打造出更出色的卡牌組合,讓自己解決問題的能力得到更好的提升。
但另一方面,隨著時間的流逝和閱歷的增長,越來越感覺到自己的能力有限,無法獨(dú)自面對紛繁復(fù)雜且多變的業(yè)務(wù)需求,在很多方面需要其他人的協(xié)助才能很好的完成任務(wù)。也知道聞道有先后,術(shù)業(yè)有專攻,不會再狂妄自大,覺得自己能把所有事情都搞定,也將重心慢慢轉(zhuǎn)移到研究如何有效的進(jìn)行團(tuán)隊合作上來,我相信一個高度協(xié)調(diào)的團(tuán)隊永遠(yuǎn)比一個人戰(zhàn)斗要更有價值。
花了一個周末的時間完成了這篇文章,文中所有的代碼都上傳到了github,https://github.com/MFrank2016/delayed-queue-demo如有需要可以自行查閱,希望能對你有幫助,如果有錯誤的地方,歡迎指正,也歡迎關(guān)注我的公眾號進(jìn)行留言交流。
總結(jié)
以上是生活随笔為你收集整理的【RabbitMQ】一文带你搞定RabbitMQ延迟队列的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 别克4s保养一次车多少钱?
- 下一篇: RabbitMQ自学之路(九)——Rab