RabbitMQ的消息确认ACK机制
1、什么是消息確認ACK。
答:如果在處理消息的過程中,消費者的服務器在處理消息的時候出現異常,那么可能這條正在處理的消息就沒有完成消息消費,數據就會丟失。為了確保數據不會丟失,RabbitMQ支持消息確定-ACK。
2、ACK的消息確認機制。
答:ACK機制是消費者從RabbitMQ收到消息并處理完成后,反饋給RabbitMQ,RabbitMQ收到反饋后才將此消息從隊列中刪除。
如果一個消費者在處理消息出現了網絡不穩定、服務器異常等現象,那么就不會有ACK反饋,RabbitMQ會認為這個消息沒有正常消費,會將消息重新放入隊列中。
如果在集群的情況下,RabbitMQ會立即將這個消息推送給這個在線的其他消費者。這種機制保證了在消費者服務端故障的時候,不丟失任何消息和任務。
消息永遠不會從RabbitMQ中刪除,只有當消費者正確發送ACK反饋,RabbitMQ確認收到后,消息才會從RabbitMQ服務器的數據中刪除。
消息的ACK確認機制默認是打開的。
3、ACK機制的開發注意事項。
答:如果忘記了ACK,那么后果很嚴重。當Consumer退出時候,Message會一直重新分發。然后RabbitMQ會占用越來越多的內容,由于RabbitMQ會長時間運行,因此這個"內存泄漏"是致命的。
4、結合項目實例進行,理解一下ACK機制。之前寫過RabbitMQ的交換器Exchange之direct(發布與訂閱 完全匹配),這里借助這個進行消息持久化測試。生產者的代碼不發生改變??刂茖拥挠|發生產者生產消息,這里只生產一條消息。方便觀察現象。
1 package com.example.bie.controller;2 3 import org.springframework.beans.factory.annotation.Autowired;4 import org.springframework.stereotype.Controller;5 import org.springframework.web.bind.annotation.RequestMapping;6 import org.springframework.web.bind.annotation.ResponseBody;7 8 import com.example.bie.provider.RabbitMqLogErrorProduce;9 import com.example.bie.provider.RabbitMqLogInfoProduce; 10 11 /** 12 * 13 * @author biehl 14 * 15 */ 16 @Controller 17 public class RabbitmqController { 18 19 @Autowired 20 private RabbitMqLogInfoProduce rabbitMqLogInfoProduce; 21 22 @Autowired 23 private RabbitMqLogErrorProduce rabbitMqLogErrorProduce; 24 25 @RequestMapping(value = "/logInfo") 26 @ResponseBody 27 public String rabbitmqSendLogInfoMessage() { 28 String msg = "生產者===>生者的LogInfo消息message: "; 29 for (int i = 0; i < 1; i++) { 30 rabbitMqLogInfoProduce.producer(msg + i); 31 } 32 return "生產===> LogInfo消息message ===> success!!!"; 33 } 34 35 @RequestMapping(value = "/logError") 36 @ResponseBody 37 public String rabbitmqSendLogErrorMessage() { 38 String msg = "生產者===>生者的LogError消息message: "; 39 for (int i = 0; i < 1; i++) { 40 rabbitMqLogErrorProduce.producer(msg + i); 41 } 42 return "生產===> LogError消息message ===> success!!!"; 43 } 44 45 }消費者消費消息,打印輸出后面手動拋出運行時異常,觀察現象。
1 package com.example.bie.consumer;2 3 import org.springframework.amqp.core.ExchangeTypes;4 import org.springframework.amqp.rabbit.annotation.Exchange;5 import org.springframework.amqp.rabbit.annotation.Queue;6 import org.springframework.amqp.rabbit.annotation.QueueBinding;7 import org.springframework.amqp.rabbit.annotation.RabbitHandler;8 import org.springframework.amqp.rabbit.annotation.RabbitListener;9 import org.springframework.stereotype.Component; 10 11 /** 12 * 13 * @author biehl 14 * 15 * 消息接收者 16 * 17 * 1、@RabbitListener bindings:綁定隊列 18 * 19 * 2、@QueueBinding 20 * value:綁定隊列的名稱、exchange:配置交換器、key:路由鍵routing-key綁定隊列和交換器 21 * 22 * 3、@Queue value:配置隊列名稱、autoDelete:是否是一個可刪除的臨時隊列 23 * 24 * 4、@Exchange value:為交換器起個名稱、type:指定具體的交換器類型 25 * 26 * 27 */ 28 @Component 29 @RabbitListener(bindings = @QueueBinding( 30 31 value = @Queue(value = "${rabbitmq.config.queue.error}", autoDelete = "true"), 32 33 exchange = @Exchange(value = "${rabbitmq.config.exchange}", type = ExchangeTypes.DIRECT), 34 35 key = "${rabbitmq.config.queue.error.routing.key}")) 36 public class LogErrorConsumer { 37 38 /** 39 * 接收消息的方法,采用消息隊列監聽機制. 40 * 41 * @RabbitHandler意思是將注解@RabbitListener配置到類上面 42 * 43 * @RabbitHandler是指定這個方法可以進行消息的接收并且消費. 44 * 45 * @param msg 46 */ 47 @RabbitHandler 48 public void consumer(String msg) { 49 // 打印消息 50 System.out.println("ERROR消費者===>消費<===消息message: " + msg); 51 throw new RuntimeException(); 52 } 53 54 }觀察現象,如下所示:
在RabbitMQ的瀏覽器界面,可以看到一條消息未被進行ACK的消息確認機制,這條消息被鎖定Unacked,所以一直在控制臺進行報錯。
控制臺效果如下所示,一直進行消息的發送,因為消費方一直沒有返回ACK確認,RabbitMQ認為消息未進行正常的消費,會將消息再次放入到隊列中,再次讓你消費,但是還是沒有返回ACK確認,依次循環,形成了死循環。
如何解決問題呢,如果消息發送的時候,程序出現異常,后果很嚴重的,會導致內存泄漏的,所以在程序處理中可以進行異常捕獲,保證消費者的程序正常執行,這里不進行介紹了。第二種方式可以使用RabbitMQ的ack確認機制。開啟重試,然后重試次數,默認為3次。這里設置為5次。
1 # 給當前項目起名稱.2 spring.application.name=rabbitmq-ack-direct-consumer3 4 # 配置端口號5 server.port=80806 7 # 配置rabbitmq的參數.8 # rabbitmq服務器的ip地址.9 spring.rabbitmq.host=192.168.110.133 10 # rabbitmq的端口號5672,區別于瀏覽器訪問界面的15672端口號. 11 spring.rabbitmq.port=5672 12 # rabbitmq的賬號. 13 spring.rabbitmq.username=guest 14 # rabbitmq的密碼. 15 spring.rabbitmq.password=guest 16 17 # 設置交換器的名稱,方便修改. 18 # 路由鍵是將交換器和隊列進行綁定的,隊列通過路由鍵綁定到交換器. 19 rabbitmq.config.exchange=log.exchange.direct 20 21 # info級別的隊列名稱. 22 rabbitmq.config.queue.info=log.info.queue 23 # info的路由鍵. 24 rabbitmq.config.queue.info.routing.key=log.info.routing.key 25 26 # error級別的隊列名稱. 27 rabbitmq.config.queue.error=log.error.queue 28 # error的路由鍵. 29 rabbitmq.config.queue.error.routing.key=log.error.routing.key 30 31 # 開啟重試 32 spring.rabbitmq.listener.simple.retry.enabled=true 33 # 重試次數,默認為3次 34 spring.rabbitmq.listener.simple.retry.max-attempts=5效果如下所示:
可以看到控制臺嘗試了5次以后就不再進行重試了。
RabbitMQ的界面可以看到,開始的效果和上面的一致,但是5次嘗試以后,就變成了0條。RabbitMQ將這條消息丟棄了。
來源:https://www.cnblogs.com/biehongli/p/11789098.html
總結
以上是生活随笔為你收集整理的RabbitMQ的消息确认ACK机制的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 寒食节是几月几日要干什么(寒食节是几月几
- 下一篇: 世界头号发电大国,中国与美国的差距有多大