Rabbitmq消息发送事务与确认机制
2019獨角獸企業重金招聘Python工程師標準>>>
默認情況下發送消息的操作是不會返回任何信息給生產者的,也就是默認情況下生產者是不知道消息有沒有正確地到達服務器。如果在消息到達服務器之前己經丟失,持久化操作也解決不了這個問題,因為消息根本沒有到達服務器 ,何談持久化?
RabbitMQ針對這個問題,提供了兩種解決方式:
1、通過事務機制實現。
2、通過發送方確認 publisher confirm 機制實現。
1、事務機制
開啟事務后,客戶端和RabbitMQ之間的通訊交互流程:
- 客戶端發送給服務器Tx.Select(開啟事務模式)
- 服務器端返回Tx.Select-Ok(開啟事務模式ok)
- 推送消息
- 客戶端發送給事務提交Tx.Commit
- 服務器端返回Tx.Commit-Ok
以上就完成了事務的交互流程,如果其中任意一個環節出現問題,就會拋出IoException移除,這樣用戶就可以攔截異常進行事務回滾,或決定要不要重復消息。
聲明交換機、隊列并綁定
/*** 1、聲明交換機、隊列并綁定*/@org.junit.Testpublic void decalreExchange() throws Exception {String exchange = "hello_tx";// 獲取到連接Connection connection = ConnectionUtil.getConnection();// 獲取通道Channel channel = connection.createChannel();// 聲明exchange,指定類型為directchannel.exchangeDeclare(exchange, BuiltinExchangeType.DIRECT,true,false,false,new HashMap<>());String queueName = "hello_tx_c";// 聲明隊列channel.queueDeclare(queueName, true, false, false, null);// 綁定隊列到交換機String routingKey = "aaa";channel.queueBind(queueName, exchange, routingKey,null);}發送消息:這里通過 1/0 來產生異常
/*** 生產者發送消息* @throws Exception*/@org.junit.Testpublic void sendMessage() throws Exception {String exchange = "hello_tx";// 獲取到連接Connection connection = ConnectionUtil.getConnection();// 獲取通道Channel channel = connection.createChannel();// 消息內容try {channel.txSelect();//開啟事務String message1 = "Less is more tx ";// 發布消息到Exchange 指定路由鍵channel.basicPublish(exchange, "aaa", MessageProperties.PERSISTENT_TEXT_PLAIN, message1.getBytes());int i = 1/0;channel.confirmSelect();//提交事務}catch (Exception e) {log.error("error:",e);channel.txRollback();//回滾}channel.close();connection.close();}可以看到,發送方出現異常,消息并沒有發送到rabbitmq的隊列里。
?
那么,既然已經有事務了,為何還要使用發送方確認模式呢,原因是因為事務的性能是非常差的。根據相關資料,事務會降低2~10倍的性能。
?
2、發送方確認模式
基于事務的性能問題,RabbitMQ團隊為我們拿出了更好的方案,即采用發送方確認模式,該模式比事務更輕量,性能影響幾乎可以忽略不計。?
原理:生產者將信道設置成confirm模式,一旦信道進入confirm模式,所有在該信道上面發布的消息都將會被指派一個唯一的ID(從1開始),由這個id在生產者和RabbitMQ之間進行消息的確認。
confirm模式最大的好處在于他可以是異步的,一旦發布一條消息,生產者應用程序就可以在等信道返回確認的同時繼續發送下一條消息,當消息最終得到確認之后,生產者應用便可以通過回調方法來處理該確認消息,如果RabbitMQ因為自身內部錯誤導致消息丟失,就會發送一條nack消息,生產者應用程序同樣可以在回調方法中處理該nack消息決定下一步的處理。
?
如何使用
首先聲明交換機、隊列并綁定
/*** 1、聲明交換機、隊列并綁定*/@org.junit.Testpublic void decalreExchange() throws Exception {String exchange = "hello_confirm";// 獲取到連接Connection connection = ConnectionUtil.getConnection();// 獲取通道Channel channel = connection.createChannel();// 聲明exchange,指定類型為directchannel.exchangeDeclare(exchange, BuiltinExchangeType.DIRECT, true, false, false, new HashMap<>());String queueName = "hello_confirm_c";// 聲明隊列channel.queueDeclare(queueName, true, false, false, null);// 綁定隊列到交換機String routingKey = "aaa";channel.queueBind(queueName, exchange, routingKey, null);}?
通過下面的代碼,進行測試
/*** 確認發送1條消息** @throws Exception*/@org.junit.Testpublic void sendMessage1() throws Exception {String exchange = "hello_confirm";// 獲取到連接Connection connection = ConnectionUtil.getConnection();// 獲取通道Channel channel = connection.createChannel();channel.confirmSelect();// 消息內容String message1 = "Less is more confirm ";// 發布消息到Exchange 指定路由鍵channel.basicPublish(exchange, "aaa", null, message1.getBytes());if (channel.waitForConfirms()){//等待回復log.debug("發送成功");}else{log.debug("發送失敗");}channel.close();connection.close();}/*** 批量確認發送消息** @throws Exception*/@org.junit.Testpublic void sendMessage2() throws Exception {String exchange = "hello_confirm";// 獲取到連接Connection connection = ConnectionUtil.getConnection();// 獲取通道Channel channel = connection.createChannel();channel.confirmSelect();for (int i= 1;i <= 3; i++){// 發布消息到Exchange 指定路由鍵// 消息內容String message = "Less is more confirm " + i;channel.basicPublish(exchange, "aaa", null, message.getBytes());}if (channel.waitForConfirms()){//批量確認log.debug("發送成功");}else{log.debug("發送失敗");}channel.close();connection.close();}/*** 添加確認監聽器** @throws Exception*/@org.junit.Testpublic void sendMessage3() throws Exception {String exchange = "hello_confirm";// 獲取到連接Connection connection = ConnectionUtil.getConnection();// 獲取通道Channel channel = connection.createChannel();channel.confirmSelect();channel.addConfirmListener(new ConfirmListener() {public void handleAck(long deliveryTag, boolean multiple)throws IOException {log.debug("deliveryTag:{},multiple:{}",deliveryTag,multiple);}public void handleNack(long deliveryTag, boolean multiple)throws IOException {}});// 發布消息到Exchange 指定路由鍵for (int i= 1;i <= 3; i++){// 發布消息到Exchange 指定路由鍵// 消息內容String message = "Less is more confirm " + i;channel.basicPublish(exchange, "aaa", null, message.getBytes());}if (channel.waitForConfirms()){//批量確認log.debug("發送成功");}else{log.debug("發送失敗");}channel.close();connection.close();}?
詳細源碼地址
https://github.com/suzhe2018/rabbitmq-item
轉載于:https://my.oschina.net/suzheworld/blog/3003370
總結
以上是生活随笔為你收集整理的Rabbitmq消息发送事务与确认机制的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 华夏幸福产业研究院顾强:从极限通勤看都市
- 下一篇: 2018 German Collegia