rabbitmq 启动异常_RabbitMQ:消息发送确认 与 消息接收确认(ACK)
生活随笔
收集整理的這篇文章主要介紹了
rabbitmq 启动异常_RabbitMQ:消息发送确认 与 消息接收确认(ACK)
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
默認情況下如果一個 Message 被消費者所正確接收則會被從 Queue 中移除
如果一個 Queue 沒被任何消費者訂閱,那么這個 Queue 中的消息會被 Cache(緩存),當有消費者訂閱時則會立即發送,當 Message 被消費者正確接收時,就會被從 Queue 中移除。
消息發送確認
發送的消息怎么樣才算失敗或成功?如何確認?
- 當消息無法路由到隊列時,確認消息路由失敗。消息成功路由時,當需要發送的隊列都發送成功后,進行確認消息,對于持久化隊列意味著寫入磁盤,對于鏡像隊列意味著所有鏡像接收成功
ConfirmCallback
- 通過實現 ConfirmCallback 接口,消息發送到 Broker 后觸發回調,確認消息是否到達 Broker 服務器,也就是只確認是否正確到達 Exchange 中
還需要在配置文件添加配置
spring:rabbitmq:publisher-confirms: trueReturnCallback
- 通過實現 ReturnCallback 接口,啟動消息失敗返回,比如路由不到隊列時觸發回調
還需要在配置文件添加配置
spring:rabbitmq:publisher-returns: true消息接收確認
消息消費者如何通知 Rabbit 消息消費成功?
- 消息通過 ACK 確認是否被正確接收,每個 Message 都要被確認(acknowledged),可以手動去 ACK 或自動 ACK
- 自動確認會在消息發送給消費者后立即確認,但存在丟失消息的可能,如果消費端消費邏輯拋出異常,也就是消費端沒有處理成功這條消息,那么就相當于丟失了消息
- 如果消息已經被處理,但后續代碼拋出異常,使用 Spring 進行管理的話消費端業務邏輯會進行回滾,這也同樣造成了實際意義的消息丟失
- 如果手動確認則當消費者調用 ack、nack、reject 幾種方法進行確認,手動確認可以在業務失敗后進行一些操作,如果消息未被 ACK 則會發送到下一個消費者
- 如果某個服務忘記 ACK 了,則 RabbitMQ 不會再發送數據給它,因為 RabbitMQ 認為該服務的處理能力有限
- ACK 機制還可以起到限流作用,比如在接收到某條消息時休眠幾秒鐘
- 消息確認模式有:
- AcknowledgeMode.NONE:自動確認
- AcknowledgeMode.AUTO:根據情況確認
- AcknowledgeMode.MANUAL:手動確認
確認消息(局部方法處理消息)
- 默認情況下消息消費者是自動 ack (確認)消息的,如果要手動 ack(確認)則需要修改確認模式為 manual
或在 RabbitListenerContainerFactory 中進行開啟手動 ack
@Bean public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();factory.setConnectionFactory(connectionFactory);factory.setMessageConverter(new Jackson2JsonMessageConverter());factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); //開啟手動 ackreturn factory; }確認消息
@RabbitHandler public void processMessage2(String message,Channel channel,@Header(AmqpHeaders.DELIVERY_TAG) long tag) {System.out.println(message);try {channel.basicAck(tag,false); // 確認消息} catch (IOException e) {e.printStackTrace();} }- 需要注意的 basicAck 方法需要傳遞兩個參數
- deliveryTag(唯一標識 ID):當一個消費者向 RabbitMQ 注冊后,會建立起一個 Channel ,RabbitMQ 會用 basic.deliver 方法向消費者推送消息,這個方法攜帶了一個 delivery tag, 它代表了 RabbitMQ 向該 Channel 投遞的這條消息的唯一標識 ID,是一個單調遞增的正整數,delivery tag 的范圍僅限于 Channel
- multiple:為了減少網絡流量,手動確認可以被批處理,當該參數為 true 時,則可以一次性確認 delivery_tag 小于等于傳入值的所有消息
手動否認、拒絕消息
- 發送一個 header 中包含 error 屬性的消息
消費者獲取消息時檢查到頭部包含 error 則 nack 消息
@RabbitHandler public void processMessage2(String message, Channel channel,@Headers Map<String,Object> map) {System.out.println(message);if (map.get("error")!= null){System.out.println("錯誤的消息");try {channel.basicNack((Long)map.get(AmqpHeaders.DELIVERY_TAG),false,true); //否認消息return;} catch (IOException e) {e.printStackTrace();}}try {channel.basicAck((Long)map.get(AmqpHeaders.DELIVERY_TAG),false); //確認消息} catch (IOException e) {e.printStackTrace();} }- 此時控制臺重復打印,說明該消息被 nack 后一直重新入隊列然后一直重新消費
hello 錯誤的消息
hello 錯誤的消息
hello 錯誤的消息
hello 錯誤的消息
也可以拒絕該消息,消息會被丟棄,不會重回隊列
channel.basicReject((Long)map.get(AmqpHeaders.DELIVERY_TAG),false); //拒絕消息確認消息(全局處理消息)
- 自動確認涉及到一個問題就是如果在處理消息的時候拋出異常,消息處理失敗,但是因為自動確認而導致 Rabbit 將該消息刪除了,造成消息丟失
手動確認消息
@Bean public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory){SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();container.setConnectionFactory(connectionFactory);container.setQueueNames("consumer_queue"); // 監聽的隊列container.setAcknowledgeMode(AcknowledgeMode.MANUAL); // 手動確認container.setMessageListener((ChannelAwareMessageListener) (message, channel) -> { //消息處理System.out.println("====接收到消息=====");System.out.println(new String(message.getBody()));if(message.getMessageProperties().getHeaders().get("error") == null){channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);System.out.println("消息已經確認");}else {//channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);System.out.println("消息拒絕");}});return container; }AcknowledgeMode 除了 NONE 和 MANUAL 之外還有 AUTO ,它會根據方法的執行情況來決定是否確認還是拒絕(是否重新入queue)
- 如果消息成功被消費(成功的意思是在消費的過程中沒有拋出異常),則自動確認
- 當拋出 AmqpRejectAndDontRequeueException 異常的時候,則消息會被拒絕,且 requeue = false(不重新入隊列)
- 當拋出 ImmediateAcknowledgeAmqpException 異常,則消費者會被確認
- 其他的異常,則消息會被拒絕,且 requeue = true(如果此時只有一個消費者監聽該隊列,則有發生死循環的風險,多消費端也會造成資源的極大浪費,這個在開發過程中一定要避免的)。可以通過 setDefaultRequeueRejected(默認是true)去設置
消息可靠總結
- 持久化
- exchange要持久化
- queue要持久化
- message要持久化
- 消息確認
- 啟動消費返回(@ReturnList注解,生產者就可以知道哪些消息沒有發出去)
- 生產者和Server(broker)之間的消息確認
- 消費者和Server(broker)之間的消息確認
總結
以上是生活随笔為你收集整理的rabbitmq 启动异常_RabbitMQ:消息发送确认 与 消息接收确认(ACK)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 鱼香日本豆腐的做法图解?
- 下一篇: m40型工业机器人_工业机器人在汽车生产