RabbitMQ事务和Confirm发送方消息确认——深入解读
引言
根據(jù)前面的知識(深入了解RabbitMQ工作原理及簡單使用、Rabbit的幾種工作模式介紹與實踐)我們知道,如果要保證消息的可靠性,需要對消息進行持久化處理,然而消息持久化除了需要代碼的設(shè)置之外,還有一個重要步驟是至關(guān)重要的,那就是保證你的消息順利進入Broker(代理服務(wù)器),如圖所示:
正常情況下,如果消息經(jīng)過交換器進入隊列就可以完成消息的持久化,但如果消息在沒有到達broker之前出現(xiàn)意外,那就造成消息丟失,有沒有辦法可以解決這個問題?
RabbitMQ有兩種方式來解決這個問題:
一、事務(wù)使用
事務(wù)的實現(xiàn)主要是對信道(Channel)的設(shè)置,主要的方法有三個:
channel.txSelect()聲明啟動事務(wù)模式;
channel.txComment()提交事務(wù);
channel.txRollback()回滾事務(wù);
從上面的可以看出事務(wù)都是以tx開頭的,tx應(yīng)該是transaction extend(事務(wù)擴展模塊)的縮寫,如果有準確的解釋歡迎在博客下留言。
我們來看具體的代碼實現(xiàn):
// 創(chuàng)建連接 ConnectionFactory factory = new ConnectionFactory(); factory.setUsername(config.UserName); factory.setPassword(config.Password); factory.setVirtualHost(config.VHost); factory.setHost(config.Host); factory.setPort(config.Port); Connection conn = factory.newConnection(); // 創(chuàng)建信道 Channel channel = conn.createChannel(); // 聲明隊列 channel.queueDeclare(_queueName, true, false, false, null); String message = String.format("時間 => %s", new Date().getTime()); try {channel.txSelect(); // 聲明事務(wù)// 發(fā)送消息channel.basicPublish("", _queueName, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"));channel.txCommit(); // 提交事務(wù) } catch (Exception e) {channel.txRollback(); } finally {channel.close();conn.close(); }注意:用戶需把config.xx配置成自己Rabbit的信息。
從上面的代碼我們可以看出,在發(fā)送消息之前的代碼和之前介紹的都是一樣的,只是在發(fā)送消息之前,需要聲明channel為事務(wù)模式,提交或者回滾事務(wù)即可。
了解了事務(wù)的實現(xiàn)之后,那么事務(wù)究竟是怎么執(zhí)行的,讓我們來使用wireshark抓個包看看,如圖所示:
輸入ip.addr==rabbitip && amqp查看客戶端和rabbit之間的通訊,可以看到交互流程:
- 客戶端發(fā)送給服務(wù)器Tx.Select(開啟事務(wù)模式)
- 服務(wù)器端返回Tx.Select-Ok(開啟事務(wù)模式ok)
- 推送消息
- 客戶端發(fā)送給事務(wù)提交Tx.Commit
- 服務(wù)器端返回Tx.Commit-Ok
以上就完成了事務(wù)的交互流程,如果其中任意一個環(huán)節(jié)出現(xiàn)問題,就會拋出IoException移除,這樣用戶就可以攔截異常進行事務(wù)回滾,或決定要不要重復(fù)消息。
那么,既然已經(jīng)有事務(wù)了,沒什么還要使用發(fā)送方確認模式呢,原因是因為事務(wù)的性能是非常差的。事務(wù)性能測試:
事務(wù)模式,結(jié)果如下:
- 事務(wù)模式,發(fā)送1w條數(shù)據(jù),執(zhí)行花費時間:14197s
- 事務(wù)模式,發(fā)送1w條數(shù)據(jù),執(zhí)行花費時間:13597s
- 事務(wù)模式,發(fā)送1w條數(shù)據(jù),執(zhí)行花費時間:14216s
非事務(wù)模式,結(jié)果如下:
- 非事務(wù)模式,發(fā)送1w條數(shù)據(jù),執(zhí)行花費時間:101s
- 非事務(wù)模式,發(fā)送1w條數(shù)據(jù),執(zhí)行花費時間:77s
- 非事務(wù)模式,發(fā)送1w條數(shù)據(jù),執(zhí)行花費時間:106s
從上面可以看出,非事務(wù)模式的性能是事務(wù)模式的性能高149倍,我的電腦測試是這樣的結(jié)果,不同的電腦配置略有差異,但結(jié)論是一樣的,事務(wù)模式的性能要差很多,那有沒有既能保證消息的可靠性又能兼顧性能的解決方案呢?那就是接下來要講的Confirm發(fā)送方確認模式。
擴展知識
我們知道,消費者可以使用消息自動或手動發(fā)送來確認消費消息,那如果我們在消費者模式中使用事務(wù)(當然如果使用了手動確認消息,完全用不到事務(wù)的),會發(fā)生什么呢?
消費者模式使用事務(wù)
假設(shè)消費者模式中使用了事務(wù),并且在消息確認之后進行了事務(wù)回滾,那么RabbitMQ會產(chǎn)生什么樣的變化?
結(jié)果分為兩種情況:
二、Confirm發(fā)送方確認模式
Confirm發(fā)送方確認模式使用和事務(wù)類似,也是通過設(shè)置Channel進行發(fā)送方確認的。
Confirm的三種實現(xiàn)方式:
方式一:channel.waitForConfirms()普通發(fā)送方確認模式;
方式二:channel.waitForConfirmsOrDie()批量確認模式;
方式三:channel.addConfirmListener()異步監(jiān)聽發(fā)送方確認模式;
方式一:普通Confirm模式
// 創(chuàng)建連接 ConnectionFactory factory = new ConnectionFactory(); factory.setUsername(config.UserName); factory.setPassword(config.Password); factory.setVirtualHost(config.VHost); factory.setHost(config.Host); factory.setPort(config.Port); Connection conn = factory.newConnection(); // 創(chuàng)建信道 Channel channel = conn.createChannel(); // 聲明隊列 channel.queueDeclare(config.QueueName, false, false, false, null); // 開啟發(fā)送方確認模式 channel.confirmSelect(); String message = String.format("時間 => %s", new Date().getTime()); channel.basicPublish("", config.QueueName, null, message.getBytes("UTF-8")); if (channel.waitForConfirms()) {System.out.println("消息發(fā)送成功" ); }看代碼可以知道,我們只需要在推送消息之前,channel.confirmSelect()聲明開啟發(fā)送方確認模式,再使用channel.waitForConfirms()等待消息被服務(wù)器確認即可。
方式二:批量Confirm模式
// 創(chuàng)建連接 ConnectionFactory factory = new ConnectionFactory(); factory.setUsername(config.UserName); factory.setPassword(config.Password); factory.setVirtualHost(config.VHost); factory.setHost(config.Host); factory.setPort(config.Port); Connection conn = factory.newConnection(); // 創(chuàng)建信道 Channel channel = conn.createChannel(); // 聲明隊列 channel.queueDeclare(config.QueueName, false, false, false, null); // 開啟發(fā)送方確認模式 channel.confirmSelect(); for (int i = 0; i < 10; i++) {String message = String.format("時間 => %s", new Date().getTime());channel.basicPublish("", config.QueueName, null, message.getBytes("UTF-8")); } channel.waitForConfirmsOrDie(); //直到所有信息都發(fā)布,只要有一個未確認就會IOException System.out.println("全部執(zhí)行完成");以上代碼可以看出來channel.waitForConfirmsOrDie(),使用同步方式等所有的消息發(fā)送之后才會執(zhí)行后面代碼,只要有一個消息未被確認就會拋出IOException異常。
方式三:異步Confirm模式
// 創(chuàng)建連接 ConnectionFactory factory = new ConnectionFactory(); factory.setUsername(config.UserName); factory.setPassword(config.Password); factory.setVirtualHost(config.VHost); factory.setHost(config.Host); factory.setPort(config.Port); Connection conn = factory.newConnection(); // 創(chuàng)建信道 Channel channel = conn.createChannel(); // 聲明隊列 channel.queueDeclare(config.QueueName, false, false, false, null); // 開啟發(fā)送方確認模式 channel.confirmSelect(); for (int i = 0; i < 10; i++) {String message = String.format("時間 => %s", new Date().getTime());channel.basicPublish("", config.QueueName, null, message.getBytes("UTF-8")); } //異步監(jiān)聽確認和未確認的消息 channel.addConfirmListener(new ConfirmListener() {@Overridepublic void handleNack(long deliveryTag, boolean multiple) throws IOException {System.out.println("未確認消息,標識:" + deliveryTag);}@Overridepublic void handleAck(long deliveryTag, boolean multiple) throws IOException {System.out.println(String.format("已確認消息,標識:%d,多個消息:%b", deliveryTag, multiple));} });異步模式的優(yōu)點,就是執(zhí)行效率高,不需要等待消息執(zhí)行完,只需要監(jiān)聽消息即可,以上異步返回的信息如下:
可以看出,代碼是異步執(zhí)行的,消息確認有可能是批量確認的,是否批量確認在于返回的multiple的參數(shù),此參數(shù)為bool值,如果true表示批量執(zhí)行了deliveryTag這個值以前的所有消息,如果為false的話表示單條確認。
Confirm性能測試
測試前提:與事務(wù)一樣,我們發(fā)送1w條消息。
方式一:Confirm普通模式
- 執(zhí)行花費時間:2253s
- 執(zhí)行花費時間:2018s
- 執(zhí)行花費時間:2043s
方式二:Confirm批量模式
- 執(zhí)行花費時間:1576s
- 執(zhí)行花費時間:1400s
- 執(zhí)行花費時間:1374s
方式三:Confirm異步監(jiān)聽方式
- 執(zhí)行花費時間:1498s
- 執(zhí)行花費時間:1368s
- 執(zhí)行花費時間:1363s
總結(jié)
綜合總體測試情況來看:Confirm批量確定和Confirm異步模式性能相差不大,Confirm模式要比事務(wù)快10倍左右。
長按二維碼關(guān)注我的技術(shù)公眾號
總結(jié)
以上是生活随笔為你收集整理的RabbitMQ事务和Confirm发送方消息确认——深入解读的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 面试官 | 如何优雅的设计Java 异常
- 下一篇: 阿里巴巴为什么禁止使用Apache Be