深入解读RabbitMQ工作原理及简单使用
深入解讀RabbitMQ工作原理及簡單使用
RabbitMQ系列目錄
RabbitMQ簡介
在介紹RabbitMQ之前實(shí)現(xiàn)要介紹一下MQ,MQ是什么?
MQ全稱是Message Queue,可以理解為消息隊(duì)列的意思,簡單來說就是消息以管道的方式進(jìn)行傳遞。
RabbitMQ是一個(gè)實(shí)現(xiàn)了AMQP(Advanced Message Queuing Protocol)高級消息隊(duì)列協(xié)議的消息隊(duì)列服務(wù),用Erlang語言的。
使用場景
在我們秒殺搶購商品的時(shí)候,系統(tǒng)會(huì)提醒我們稍等排隊(duì)中,而不是像幾年前一樣頁面卡死或報(bào)錯(cuò)給用戶。
像這種排隊(duì)結(jié)算就用到了消息隊(duì)列機(jī)制,放入通道里面一個(gè)一個(gè)結(jié)算處理,而不是某個(gè)時(shí)間斷突然涌入大批量的查詢新增把數(shù)據(jù)庫給搞宕機(jī),所以RabbitMQ本質(zhì)上起到的作用就是削峰填谷,為業(yè)務(wù)保駕護(hù)航。
為什么選擇RabbitMQ
現(xiàn)在的市面上有很多MQ可以選擇,比如ActiveMQ、ZeroMQ、Appche Qpid,那問題來了為什么要選擇RabbitMQ?
工作機(jī)制
生產(chǎn)者、消費(fèi)者和代理
在了解消息通訊之前首先要了解3個(gè)概念:生產(chǎn)者、消費(fèi)者和代理。
生產(chǎn)者:消息的創(chuàng)建者,負(fù)責(zé)創(chuàng)建和推送數(shù)據(jù)到消息服務(wù)器;
消費(fèi)者:消息的接收方,用于處理數(shù)據(jù)和確認(rèn)消息;
代理:就是RabbitMQ本身,用于扮演“快遞”的角色,本身不生產(chǎn)消息,只是扮演“快遞”的角色。
消息發(fā)送原理
首先你必須連接到Rabbit才能發(fā)布和消費(fèi)消息,那怎么連接和發(fā)送消息的呢?
你的應(yīng)用程序和Rabbit Server之間會(huì)創(chuàng)建一個(gè)TCP連接,一旦TCP打開,并通過了認(rèn)證,認(rèn)證就是你試圖連接Rabbit之前發(fā)送的Rabbit服務(wù)器連接信息和用戶名和密碼,有點(diǎn)像程序連接數(shù)據(jù)庫,使用Java有兩種連接認(rèn)證的方式,后面代碼會(huì)詳細(xì)介紹,一旦認(rèn)證通過你的應(yīng)用程序和Rabbit就創(chuàng)建了一條AMQP信道(Channel)。
信道是創(chuàng)建在“真實(shí)”TCP上的虛擬連接,AMQP命令都是通過信道發(fā)送出去的,每個(gè)信道都會(huì)有一個(gè)唯一的ID,不論是發(fā)布消息,訂閱隊(duì)列或者介紹消息都是通過信道完成的。
為什么不通過TCP直接發(fā)送命令?
對于操作系統(tǒng)來說創(chuàng)建和銷毀TCP會(huì)話是非常昂貴的開銷,假設(shè)高峰期每秒有成千上萬條連接,每個(gè)連接都要?jiǎng)?chuàng)建一條TCP會(huì)話,這就造成了TCP連接的巨大浪費(fèi),而且操作系統(tǒng)每秒能創(chuàng)建的TCP也是有限的,因此很快就會(huì)遇到系統(tǒng)瓶頸。
如果我們每個(gè)請求都使用一條TCP連接,既滿足了性能的需要,又能確保每個(gè)連接的私密性,這就是引入信道概念的原因。
你必須知道的Rabbit
想要真正的了解Rabbit有些名詞是你必須知道的。
包括:ConnectionFactory(連接管理器)、Channel(信道)、Exchange(交換器)、Queue(隊(duì)列)、RoutingKey(路由鍵)、BindingKey(綁定鍵)。
ConnectionFactory(連接管理器):應(yīng)用程序與Rabbit之間建立連接的管理器,程序代碼中使用;
Channel(信道):消息推送使用的通道;
Exchange(交換器):用于接受、分配消息;
Queue(隊(duì)列):用于存儲(chǔ)生產(chǎn)者的消息;
RoutingKey(路由鍵):用于把生成者的數(shù)據(jù)分配到交換器上;
BindingKey(綁定鍵):用于把交換器的消息綁定到隊(duì)列上;
看到上面的解釋,最難理解的路由鍵和綁定鍵了,那么他們具體怎么發(fā)揮作用的,請看下圖:
關(guān)于更多交換器的信息,我們在后面再講。
消息持久化
Rabbit隊(duì)列和交換器有一個(gè)不可告人的秘密,就是默認(rèn)情況下重啟服務(wù)器會(huì)導(dǎo)致消息丟失,那么怎么保證Rabbit在重啟的時(shí)候不丟失呢?答案就是消息持久化。
當(dāng)你把消息發(fā)送到Rabbit服務(wù)器的時(shí)候,你需要選擇你是否要進(jìn)行持久化,但這并不能保證Rabbit能從崩潰中恢復(fù),想要Rabbit消息能恢復(fù)必須滿足3個(gè)條件:
投遞消息的時(shí)候durable設(shè)置為true,消息持久化;
消息已經(jīng)到達(dá)持久化交換器上;
消息已經(jīng)到達(dá)持久化的隊(duì)列;
持久化工作原理
Rabbit會(huì)將你的持久化消息寫入磁盤上的持久化日志文件,等消息被消費(fèi)之后,Rabbit會(huì)把這條消息標(biāo)識(shí)為等待垃圾回收。
持久化的缺點(diǎn)
消息持久化的優(yōu)點(diǎn)顯而易見,但缺點(diǎn)也很明顯,那就是性能,因?yàn)橐獙懭胗脖P要比寫入內(nèi)存性能較低很多,從而降低了服務(wù)器的吞吐量,盡管使用SSD硬盤可以使事情得到緩解,但他仍然吸干了Rabbit的性能,當(dāng)消息成千上萬條要寫入磁盤的時(shí)候,性能是很低的。
所以使用者要根據(jù)自己的情況,選擇適合自己的方式。
虛擬主機(jī)
每個(gè)Rabbit都能創(chuàng)建很多vhost,我們稱之為虛擬主機(jī),每個(gè)虛擬主機(jī)其實(shí)都是mini版的RabbitMQ,擁有自己的隊(duì)列,交換器和綁定,擁有自己的權(quán)限機(jī)制。
環(huán)境搭建
前文我們已經(jīng)介紹了Ubuntu搭建RabbitMQ的步驟:RabbitMQ在Ubuntu上的環(huán)境搭建
如果你是在Windows10上去安裝那就更簡單了,先放下載地址:
Erlang/Rabbit Server百度網(wǎng)盤鏈接:https://pan.baidu.com/s/1TnKDV-ZuXLiIgyK8c8f9dg 密碼:wct9
當(dāng)然也可去Erlang和Rabbit官網(wǎng)去下,就是速度比較慢。我的百度云Rabbit最新版本:3.7.6,Erlang版本:20.2,注意:不要下載最新的Erlang,在Windows10上打開擴(kuò)展插件有問題,打不開。
安裝Erlang;
安裝Rabbit Server;
進(jìn)入安裝目錄\sbin下,使用命令“rabbitmq-plugins enable rabbitmq_management”啟動(dòng)網(wǎng)頁管理插件;
重啟Rabbit服務(wù);
使用:http://localhost:15672進(jìn)行測試,默認(rèn)的登陸賬號為:guest,密碼為:guest
重復(fù)安裝Rabbit Server的坑
如果不是第一次在Windows上安裝Rabbit Server一定要把Rabbit和Erlang卸載干凈之后,找到注冊表:HKEY_LOCAL_MACHINE\SOFTWARE\Ericsson\Erlang\ErlSrv 刪除其下的所有項(xiàng)。
不然會(huì)出現(xiàn)Rabbit安裝之后啟動(dòng)不了的情況,理論上卸載的順序也是先Rabbit在Erlang。
代碼實(shí)現(xiàn)
java版實(shí)現(xiàn),使用maven項(xiàng)目,創(chuàng)建可以查看:MyEclipse2017破解設(shè)置與maven項(xiàng)目搭建
項(xiàng)目創(chuàng)建成功之后,添加Rabbit Client jar包,只需要在pom.xml里面配置,如下信息:
<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>4.7.0</version> </dependency>java實(shí)現(xiàn)代碼分為兩個(gè)類,第一個(gè)是創(chuàng)建Rabbit連接,第二是應(yīng)用類使用最簡單的方式發(fā)布和消費(fèi)消息。
Rabbit的連接,兩種方式:
方式一:
public static Connection GetRabbitConnection() {ConnectionFactory factory = new ConnectionFactory();factory.setUsername(Config.UserName);factory.setPassword(Config.Password);factory.setVirtualHost(Config.VHost);factory.setHost(Config.Host);factory.setPort(Config.Port);Connection conn = null;try {conn = factory.newConnection();} catch (Exception e) {e.printStackTrace();}return conn; }方式二:
public static Connection GetRabbitConnection2() {ConnectionFactory factory = new ConnectionFactory();// 連接格式:amqp://userName:password@hostName:portNumber/virtualHostString uri = String.format("amqp://%s:%s@%s:%d%s", Config.UserName, Config.Password, Config.Host, Config.Port,Config.VHost);Connection conn = null;try {factory.setUri(uri);factory.setVirtualHost(Config.VHost);conn = factory.newConnection();} catch (Exception e) {e.printStackTrace();}return conn; }第二部分:應(yīng)用類,使用最簡單的方式發(fā)布和消費(fèi)消息
public static void main(String[] args) {Publisher(); // 推送消息Consumer(); // 消費(fèi)消息 }/*** 推送消息*/ public static void Publisher() {// 創(chuàng)建一個(gè)連接Connection conn = ConnectionFactoryUtil.GetRabbitConnection();if (conn != null) {try {// 創(chuàng)建通道Channel channel = conn.createChannel();// 聲明隊(duì)列【參數(shù)說明:參數(shù)一:隊(duì)列名稱,參數(shù)二:是否持久化;參數(shù)三:是否獨(dú)占模式;參數(shù)四:消費(fèi)者斷開連接時(shí)是否刪除隊(duì)列;參數(shù)五:消息其他參數(shù)】channel.queueDeclare(Config.QueueName, false, false, false, null);String content = String.format("當(dāng)前時(shí)間:%s", new Date().getTime());// 發(fā)送內(nèi)容【參數(shù)說明:參數(shù)一:交換機(jī)名稱;參數(shù)二:隊(duì)列名稱,參數(shù)三:消息的其他屬性;參數(shù)四:消息主體】channel.basicPublish("", Config.QueueName, null, content.getBytes("UTF-8"));System.out.println("已發(fā)送消息:" + content);// 關(guān)閉連接channel.close();conn.close();} catch (Exception e) {e.printStackTrace();}} }/*** 消費(fèi)消息*/ public static void Consumer() {// 創(chuàng)建一個(gè)連接Connection conn = ConnectionFactoryUtil.GetRabbitConnection();if (conn != null) {try {// 創(chuàng)建通道Channel channel = conn.createChannel();// 聲明隊(duì)列【參數(shù)說明:參數(shù)一:隊(duì)列名稱,參數(shù)二:是否持久化;參數(shù)三:是否獨(dú)占模式;參數(shù)四:消費(fèi)者斷開連接時(shí)是否刪除隊(duì)列;參數(shù)五:消息其他參數(shù)】channel.queueDeclare(Config.QueueName, false, false, false, null);// 創(chuàng)建訂閱器,并接受消息channel.basicConsume(Config.QueueName, false, "", new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,byte[] body) throws IOException {String routingKey = envelope.getRoutingKey(); // 隊(duì)列名稱String contentType = properties.getContentType(); // 內(nèi)容類型String content = new String(body, "utf-8"); // 消息正文System.out.println("消息正文:" + content);channel.basicAck(envelope.getDeliveryTag(), false); // 手動(dòng)確認(rèn)消息【參數(shù)說明:參數(shù)一:該消息的index;參數(shù)二:是否批量應(yīng)答,true批量確認(rèn)小于index的消息】}});} catch (Exception e) {e.printStackTrace();}} }代碼里面已經(jīng)寫了很詳細(xì)的注釋,在這里也不過多的介紹了。
執(zhí)行效果,如圖:
總結(jié)
以上是生活随笔為你收集整理的深入解读RabbitMQ工作原理及简单使用的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 年终盘点 | 2019年Java面试题汇
- 下一篇: sql无限递归查询