rabbitmq详细入门文档+springboot结合使用
在介紹RabbitMQ之前,我們先來看下面一個電商項目的場景:
-
商品的原始數據保存在數據庫中,增刪改查都在數據庫中完成。
-
搜索服務數據來源是索引庫(Elasticsearch),如果數據庫商品發生變化,索引庫數據不能及時更新。
-
商品詳情做了頁面靜態化處理,靜態頁面數據也不會隨著數據庫商品更新而變化。
如果我們在后臺修改了商品的價格,搜索頁面和商品詳情頁顯示的依然是舊的價格,這樣顯然不對。該如何解決? ?
我們可能會想到這么做:
-
方案1:每當后臺對商品做增刪改操作,同時修改索引庫數據及更新靜態頁面。
-
方案2:搜索服務和商品頁面靜態化服務對外提供操作接口,后臺在商品增刪改后,調用接口。?
?這兩種方案都有個嚴重的問題:就是代碼耦合,后臺服務中需要嵌入搜索和商品頁面服務,違背了微服務的獨立原則。
這時,我們就會采用另外一種解決辦法,那就是消息隊列!?
? ? ? ? 商品服務對商品增刪改以后,無需去操作索引庫和靜態頁面,只需向MQ發送一條消息(比如包含商品id的消息),也不關心消息被誰接收。 搜索服務和靜態頁面服務監聽MQ,接收消息,然后分別去處理索引庫和靜態頁面(根據商品id去更新索引庫和商品詳情靜態頁面)。?
什么是消息隊列
MQ全稱為Message Queue,即消息隊列。“消息隊列”是在消息的傳輸過程中保存消息的容器。它是典型的:生產者、消費者模型。生產者不斷向消息隊列中生產消息,消費者不斷的從隊列中獲取消息。因為消息的生產和消費都是異步的,而且只關心消息的發送和接收,沒有業務邏輯的侵入,這樣就實現了生產者和消費者的解耦。
?
開發中消息隊列通常有如下應用場景:
1、任務異步處理:
高并發環境下,由于來不及同步處理,請求往往會發生堵塞,比如說,大量的insert,update之類的請求同時到達MySQL,直接導致無數的行鎖表鎖,甚至最后請求會堆積過多,從而觸發too many connections錯誤。通過使用消息隊列,我們可以異步處理請求,從而緩解系統的壓力。將不需要同步處理的并且耗時長的操作由消息隊列通知消息接收方進行異步處理。減少了應用程序的響應時間。
2、應用程序解耦合:
MQ相當于一個中介,生產方通過MQ與消費方交互,它將應用程序進行解耦合。
?
AMQP和JMS
MQ是消息通信的模型,并發具體實現。現在實現MQ的有兩種主流方式:AMQP、JMS。
兩者間的區別和聯系:
-
JMS是定義了統一的接口,來對消息操作進行統一;AMQP是通過規定協議來統一數據交互的格式
-
JMS限定了必須使用Java語言;AMQP只是協議,不規定實現方式,因此是跨語言的。
-
JMS規定了兩種消息模型;而AMQP的消息模型更加豐富
常見MQ產品
-
ActiveMQ:基于JMS
-
RabbitMQ:基于AMQP協議,erlang語言開發,穩定性好
-
RocketMQ:基于JMS,阿里巴巴產品,目前交由Apache基金會
-
Kafka:分布式消息系統,高吞吐量
RabbitMQ快速入門
RabbitMQ是由erlang語言開發,基于AMQP(Advanced Message Queue 高級消息隊列協議)協議實現的消息隊列,它是一種應用程序之間的通信方法,消息隊列在分布式系統開發中應用非常廣泛。RabbitMQ官方地址:http://www.rabbitmq.com
下載與安裝
RabbitMQ由Erlang語言開發,需要安裝與RabbitMQ版本對應的Erlang語言環境,具體的就不解釋了,自行搜索教程。RabbitMQ官網下載地址:http://www.rabbitmq.com/download.html
RabbitMQ的工作原理
下圖是RabbitMQ的基本結構:
組成部分說明:
- Broker:消息隊列服務進程,此進程包括兩個部分:Exchange和Queue
- Exchange:消息隊列交換機,按一定的規則將消息路由轉發到某個隊列,對消息進行過慮。
- Queue:消息隊列,存儲消息的隊列,消息到達隊列并轉發給指定的
- Producer:消息生產者,即生產方客戶端,生產方客戶端將消息發送
- Consumer:消息消費者,即消費方客戶端,接收MQ轉發的消息。
生產者發送消息流程:
1、生產者和Broker建立TCP連接。
2、生產者和Broker建立通道。
3、生產者通過通道消息發送給Broker,由Exchange將消息進行轉發。
4、Exchange將消息轉發到指定的Queue(隊列)
消費者接收消息流程:
1、消費者和Broker建立TCP連接
2、消費者和Broker建立通道
3、消費者監聽指定的Queue(隊列)
4、當有消息到達Queue時Broker默認將消息推送給消費者。
5、消費者接收到消息。
6、ack回復
六種消息模型
①基本消息模型:
在上圖的模型中,有以下概念:
-
P:生產者,也就是要發送消息的程序
-
C:消費者:消息的接受者,會一直等待消息到來。
-
queue:消息隊列,圖中紅色部分。可以緩存消息;生產者向其中投遞消息,消費者從其中取出消息。?
?生產者
新建一個maven工程,添加amqp-client依賴
<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.7.1</version> </dependency>連接工具類:
public class ConnectionUtil {/*** 建立與RabbitMQ的連接* @return* @throws Exception*/public static Connection getConnection() throws Exception {//定義連接工廠ConnectionFactory factory = new ConnectionFactory();//設置服務地址factory.setHost("192.168.1.103");//端口factory.setPort(5672);//設置賬號信息,用戶名、密碼、vhostfactory.setVirtualHost("/kavito");//設置虛擬機,一個mq服務可以設置多個虛擬機,每個虛擬機就相當于一個獨立的mqfactory.setUsername("kavito");factory.setPassword("123456");// 通過工廠獲取連接Connection connection = factory.newConnection();return connection;} }生產者發送消息:
public class Send {private final static String QUEUE_NAME = "simple_queue";public static void main(String[] argv) throws Exception {// 1、獲取到連接Connection connection = ConnectionUtil.getConnection();// 2、從連接中創建通道,使用通道才能完成消息相關的操作Channel channel = connection.createChannel();// 3、聲明(創建)隊列//參數:String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments/*** 參數明細* 1、queue 隊列名稱* 2、durable 是否持久化,如果持久化,mq重啟后隊列還在* 3、exclusive 是否獨占連接,隊列只允許在該連接中訪問,如果connection連接關閉隊列則自動刪除,如果將此參數設置true可用于臨時隊列的創建* 4、autoDelete 自動刪除,隊列不再使用時是否自動刪除此隊列,如果將此參數和exclusive參數設置為true就可以實現臨時隊列(隊列不用了就自動刪除)* 5、arguments 參數,可以設置一個隊列的擴展參數,比如:可設置存活時間*/channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 4、消息內容String message = "Hello World!";// 向指定的隊列中發送消息//參數:String exchange, String routingKey, BasicProperties props, byte[] body/*** 參數明細:* 1、exchange,交換機,如果不指定將使用mq的默認交換機(設置為"")* 2、routingKey,路由key,交換機根據路由key來將消息轉發到指定的隊列,如果使用默認交換機,routingKey設置為隊列的名稱* 3、props,消息的屬性* 4、body,消息內容*/channel.basicPublish("", QUEUE_NAME, null, message.getBytes());System.out.println(" [x] Sent '" + message + "'");//關閉通道和連接(資源關閉最好用try-catch-finally語句處理)channel.close();connection.close();} }?控制臺:
web管理頁面:服務器地址/端口號 (本地:127.0.0.1:15672,默認用戶及密碼:guest guest)
點擊隊列名稱,進入詳情頁,可以查看消息:
消費者接收消息
public class Recv {private final static String QUEUE_NAME = "simple_queue";public static void main(String[] argv) throws Exception {// 獲取到連接Connection connection = ConnectionUtil.getConnection();//創建會話通道,生產者和mq服務所有通信都在channel通道中完成Channel channel = connection.createChannel();// 聲明隊列//參數:String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments/*** 參數明細* 1、queue 隊列名稱* 2、durable 是否持久化,如果持久化,mq重啟后隊列還在* 3、exclusive 是否獨占連接,隊列只允許在該連接中訪問,如果connection連接關閉隊列則自動刪除,如果將此參數設置true可用于臨時隊列的創建* 4、autoDelete 自動刪除,隊列不再使用時是否自動刪除此隊列,如果將此參數和exclusive參數設置為true就可以實現臨時隊列(隊列不用了就自動刪除)* 5、arguments 參數,可以設置一個隊列的擴展參數,比如:可設置存活時間*/channel.queueDeclare(QUEUE_NAME, false, false, false, null);//實現消費方法DefaultConsumer consumer = new DefaultConsumer(channel){// 獲取消息,并且處理,這個方法類似事件監聽,如果有消息的時候,會被自動調用/*** 當接收到消息后此方法將被調用* @param consumerTag 消費者標簽,用來標識消費者的,在監聽隊列時設置channel.basicConsume* @param envelope 信封,通過envelope* @param properties 消息屬性* @param body 消息內容* @throws IOException*/@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//交換機String exchange = envelope.getExchange();//消息id,mq在channel中用來標識消息的id,可用于確認消息已接收long deliveryTag = envelope.getDeliveryTag();// body 即消息體String msg = new String(body,"utf-8");System.out.println(" [x] received : " + msg + "!");}};// 監聽隊列,第二個參數:是否自動進行消息確認。//參數:String queue, boolean autoAck, Consumer callback/*** 參數明細:* 1、queue 隊列名稱* 2、autoAck 自動回復,當消費者接收到消息后要告訴mq消息已接收,如果將此參數設置為tru表示會自動回復mq,如果設置為false要通過編程實現回復* 3、callback,消費方法,當消費者接收到消息要執行的方法*/channel.basicConsume(QUEUE_NAME, true, consumer);} }控制臺打印:
再看看隊列的消息,已經被消費了
?
?我們發現,消費者已經獲取了消息,但是程序沒有停止,一直在監聽隊列中是否有新的消息。一旦有新的消息進入隊列,就會立即打印.
消息確認機制(ACK)
通過剛才的案例可以看出,消息一旦被消費者接收,隊列中的消息就會被刪除。
那么問題來了:RabbitMQ怎么知道消息被接收了呢?
如果消費者領取消息后,還沒執行操作就掛掉了呢?或者拋出了異常?消息消費失敗,但是RabbitMQ無從得知,這樣消息就丟失了!
因此,RabbitMQ有一個ACK機制。當消費者獲取消息后,會向RabbitMQ發送回執ACK,告知消息已經被接收。不過這種回執ACK分兩種情況:
-
自動ACK:消息一旦被接收,消費者自動發送ACK
-
手動ACK:消息接收后,不會發送ACK,需要手動調用
大家覺得哪種更好呢?
這需要看消息的重要性:
-
如果消息不太重要,丟失也沒有影響,那么自動ACK會比較方便
-
如果消息非常重要,不容丟失。那么最好在消費完成后手動ACK,否則接收消息后就自動ACK,RabbitMQ就會把消息從隊列中刪除。如果此時消費者宕機,那么消息就丟失了。
我們之前的測試都是自動ACK的,如果要手動ACK,需要改動我們的代碼:
public class Recv2 {private final static String QUEUE_NAME = "simple_queue";public static void main(String[] argv) throws Exception {// 獲取到連接Connection connection = ConnectionUtil.getConnection();// 創建通道final Channel channel = connection.createChannel();// 聲明隊列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 定義隊列的消費者DefaultConsumer consumer = new DefaultConsumer(channel) {// 獲取消息,并且處理,這個方法類似事件監聽,如果有消息的時候,會被自動調用@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {// body 即消息體String msg = new String(body);System.out.println(" [x] received : " + msg + "!");// 手動進行ACK/** void basicAck(long deliveryTag, boolean multiple) throws IOException;* deliveryTag:用來標識消息的id* multiple:是否批量.true:將一次性ack所有小于deliveryTag的消息。*/channel.basicAck(envelope.getDeliveryTag(), false);}};// 監聽隊列,第二個參數false,手動進行ACKchannel.basicConsume(QUEUE_NAME, false, consumer);} }最后一行代碼設置第二個參數為false
channel.basicConsume(QUEUE_NAME, false, consumer);自動ACK存在的問題
修改消費者,添加異常,如下:
生產者不做任何修改,直接運行,消息發送成功: ?
?運行消費者,程序拋出異常:
管理界面:
?消費者拋出異常,但是消息依然被消費,實際上我們還沒獲取到消息。
演示手動ACK
重新運行生產者發送消息:
同樣,在手動進行ack前拋出異常,運行Recv2
?再看看管理界面:
?消息沒有被消費掉!
還有另外一種情況:修改消費者Recv2,把監聽隊列第二個參數自動改成手動,(去掉之前制造的異常) ,并且消費方法中沒手動進行ACK
?
?生產者代碼不變,再次運行:
運行消費者 :
但是,查看管理界面,發現:
停掉消費者的程序,發現: ?
這是因為雖然我們設置了手動ACK,但是代碼中并沒有進行消息確認!所以消息并未被真正消費掉。當我們關掉這個消費者,消息的狀態再次變為Ready。
正確的做法是:
我們要在監聽隊列時設置第二個參數為false,代碼中手動進行ACK
?
?再次運行消費者,查看web管理頁面:
消費者消費成功! ?
生產者避免數據丟失:https://www.cnblogs.com/vipstone/p/9350075.html
②work消息模型
工作隊列或者競爭消費者模式
work queues與入門程序相比,多了一個消費端,兩個消費端共同消費同一個隊列中的消息,但是一個消息只能被一個消費者獲取。
這個消息模型在Web應用程序中特別有用,可以處理短的HTTP請求窗口中無法處理復雜的任務。
接下來我們來模擬這個流程:
P:生產者:任務的發布者
C1:消費者1:領取任務并且完成任務,假設完成速度較慢(模擬耗時)
C2:消費者2:領取任務并且完成任務,假設完成速度較快
?
生產者
生產者循環發送50條消息
public class Send {private final static String QUEUE_NAME = "test_work_queue";public static void main(String[] argv) throws Exception {// 獲取到連接Connection connection = ConnectionUtil.getConnection();// 獲取通道Channel channel = connection.createChannel();// 聲明隊列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 循環發布任務for (int i = 0; i < 50; i++) {// 消息內容String message = "task .. " + i;channel.basicPublish("", QUEUE_NAME, null, message.getBytes());System.out.println(" [x] Sent '" + message + "'");Thread.sleep(i * 2);}// 關閉通道和連接channel.close();connection.close();} }消費者1
public class Recv {private final static String QUEUE_NAME = "test_work_queue";public static void main(String[] argv) throws Exception {// 獲取到連接Connection connection = ConnectionUtil.getConnection();//創建會話通道,生產者和mq服務所有通信都在channel通道中完成Channel channel = connection.createChannel();// 聲明隊列channel.queueDeclare(QUEUE_NAME, false, false, false, null);//實現消費方法DefaultConsumer consumer = new DefaultConsumer(channel){// 獲取消息,并且處理,這個方法類似事件監聽,如果有消息的時候,會被自動調用@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {// body 即消息體String msg = new String(body,"utf-8");System.out.println(" [消費者1] received : " + msg + "!");//模擬任務耗時1stry { TimeUnit.SECONDS.sleep(1); } catch (Exception e) { e.printStackTrace(); }}};// 監聽隊列,第二個參數:是否自動進行消息確認。channel.basicConsume(QUEUE_NAME, true, consumer);} }消費者2
代碼不貼了,與消費者1基本類似,只是消費者2沒有設置消費耗時時間。
接下來,兩個消費者一同啟動,然后發送50條消息:
?
可以發現,兩個消費者各自消費了不同25條消息,這就實現了任務的分發。???
能者多勞
剛才的實現有問題嗎?
-
消費者1比消費者2的效率要低,一次任務的耗時較長
-
然而兩人最終消費的消息數量是一樣的
-
消費者2大量時間處于空閑狀態,消費者1一直忙碌
現在的狀態屬于是把任務平均分配,正確的做法應該是消費越快的人,消費的越多。
怎么實現呢?
通過 BasicQos 方法設置prefetchCount = 1。這樣RabbitMQ就會使得每個Consumer在同一個時間點最多處理1個Message。換句話說,在接收到該Consumer的ack前,他它不會將新的Message分發給它。相反,它會將其分派給不是仍然忙碌的下一個Consumer。
值得注意的是:prefetchCount在手動ack的情況下才生效,自動ack不生效。
再次測試:?
?
訂閱模型分類
說明下:
1、一個生產者多個消費者
2、每個消費者都有一個自己的隊列
3、生產者沒有將消息直接發送給隊列,而是發送給exchange(交換機、轉發器)
4、每個隊列都需要綁定到交換機上
5、生產者發送的消息,經過交換機到達隊列,實現一個消息被多個消費者消費
例子:注冊->發郵件、發短信
X(Exchanges):交換機一方面:接收生產者發送的消息。另一方面:知道如何處理消息,例如遞交給某個特別隊列、遞交給所有隊列、或是將消息丟棄。到底如何操作,取決于Exchange的類型。
Exchange類型有以下幾種:
Fanout:廣播,將消息交給所有綁定到交換機的隊列
Direct:定向,把消息交給符合指定routing key 的隊列
Topic:通配符,把消息交給符合routing pattern(路由模式) 的隊列
Header:header模式與routing不同的地方在于,header模式取消routingkey,使用header中的?key/value(鍵值對)匹配隊列。
Header模式不展開了,感興趣可以參考這篇文章https://blog.csdn.net/zhu_tianwei/article/details/40923131
Exchange(交換機)只負責轉發消息,不具備存儲消息的能力,因此如果沒有任何隊列與Exchange綁定,或者沒有符合路由規則的隊列,那么消息會丟失!
③Publish/subscribe(交換機類型:Fanout,也稱為廣播?)
Publish/subscribe模型示意圖 :
?
生產者
和前面兩種模式不同:
-
1) 聲明Exchange,不再聲明Queue
-
2) 發送消息到Exchange,不再發送到Queue
消費者1?(注冊成功發給短信服務)
public class Recv {private final static String QUEUE_NAME = "fanout_exchange_queue_sms";//短信隊列private final static String EXCHANGE_NAME = "test_fanout_exchange";public static void main(String[] argv) throws Exception {// 獲取到連接Connection connection = ConnectionUtil.getConnection();// 獲取通道Channel channel = connection.createChannel();// 聲明隊列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 綁定隊列到交換機channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");// 定義隊列的消費者DefaultConsumer consumer = new DefaultConsumer(channel) {// 獲取消息,并且處理,這個方法類似事件監聽,如果有消息的時候,會被自動調用@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,byte[] body) throws IOException {// body 即消息體String msg = new String(body);System.out.println(" [短信服務] received : " + msg + "!");}};// 監聽隊列,自動返回完成channel.basicConsume(QUEUE_NAME, true, consumer);} }消費者2(注冊成功發給郵件服務)
public class Recv2 {private final static String QUEUE_NAME = "fanout_exchange_queue_email";//郵件隊列private final static String EXCHANGE_NAME = "test_fanout_exchange";public static void main(String[] argv) throws Exception {// 獲取到連接Connection connection = ConnectionUtil.getConnection();// 獲取通道Channel channel = connection.createChannel();// 聲明隊列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 綁定隊列到交換機channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");// 定義隊列的消費者DefaultConsumer consumer = new DefaultConsumer(channel) {// 獲取消息,并且處理,這個方法類似事件監聽,如果有消息的時候,會被自動調用@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,byte[] body) throws IOException {// body 即消息體String msg = new String(body);System.out.println(" [郵件服務] received : " + msg + "!");}};// 監聽隊列,自動返回完成channel.basicConsume(QUEUE_NAME, true, consumer);} }我們運行兩個消費者,然后發送1條消息:
思考:
1、publish/subscribe與work queues有什么區別。
區別:
1)work queues不用定義交換機,而publish/subscribe需要定義交換機。
2)publish/subscribe的生產方是面向交換機發送消息,work queues的生產方是面向隊列發送消息(底層使用默認交換機)。
3)publish/subscribe需要設置隊列和交換機的綁定,work queues不需要設置,實際上work queues會將隊列綁定到默認的交換機 。
相同點:
所以兩者實現的發布/訂閱的效果是一樣的,多個消費端監聽同一個隊列不會重復消費消息。
2、實際工作用?publish/subscribe還是work queues。
建議使用 publish/subscribe,發布訂閱模式比工作隊列模式更強大(也可以做到同一隊列競爭),并且發布訂閱模式可以指定自己專用的交換機。
④Routing 路由模型(交換機類型:direct)
Routing模型示意圖:
?
P:生產者,向Exchange發送消息,發送消息時,會指定一個routing key。
X:Exchange(交換機),接收生產者的消息,然后把消息遞交給 與routing key完全匹配的隊列
C1:消費者,其所在隊列指定了需要routing key 為 error 的消息
C2:消費者,其所在隊列指定了需要routing key 為 info、error、warning 的消息
接下來看代碼:
生產者
public class Send {private final static String EXCHANGE_NAME = "test_direct_exchange";public static void main(String[] argv) throws Exception {// 獲取到連接Connection connection = ConnectionUtil.getConnection();// 獲取通道Channel channel = connection.createChannel();// 聲明exchange,指定類型為directchannel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);// 消息內容,String message = "注冊成功!請短信回復[T]退訂";// 發送消息,并且指定routing key 為:sms,只有短信服務能接收到消息channel.basicPublish(EXCHANGE_NAME, "sms", null, message.getBytes());System.out.println(" [x] Sent '" + message + "'");channel.close();connection.close();} }消費者1
public class Recv {private final static String QUEUE_NAME = "direct_exchange_queue_sms";//短信隊列private final static String EXCHANGE_NAME = "test_direct_exchange";public static void main(String[] argv) throws Exception {// 獲取到連接Connection connection = ConnectionUtil.getConnection();// 獲取通道Channel channel = connection.createChannel();// 聲明隊列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 綁定隊列到交換機,同時指定需要訂閱的routing key。可以指定多個channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "sms");//指定接收發送方指定routing key為sms的消息//channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "email");// 定義隊列的消費者DefaultConsumer consumer = new DefaultConsumer(channel) {// 獲取消息,并且處理,這個方法類似事件監聽,如果有消息的時候,會被自動調用@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,byte[] body) throws IOException {// body 即消息體String msg = new String(body);System.out.println(" [短信服務] received : " + msg + "!");}};// 監聽隊列,自動ACKchannel.basicConsume(QUEUE_NAME, true, consumer);} }消費者2
public class Recv2 {private final static String QUEUE_NAME = "direct_exchange_queue_email";//郵件隊列private final static String EXCHANGE_NAME = "test_direct_exchange";public static void main(String[] argv) throws Exception {// 獲取到連接Connection connection = ConnectionUtil.getConnection();// 獲取通道Channel channel = connection.createChannel();// 聲明隊列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 綁定隊列到交換機,同時指定需要訂閱的routing key。可以指定多個channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "email");//指定接收發送方指定routing key為email的消息// 定義隊列的消費者DefaultConsumer consumer = new DefaultConsumer(channel) {// 獲取消息,并且處理,這個方法類似事件監聽,如果有消息的時候,會被自動調用@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,byte[] body) throws IOException {// body 即消息體String msg = new String(body);System.out.println(" [郵件服務] received : " + msg + "!");}};// 監聽隊列,自動ACKchannel.basicConsume(QUEUE_NAME, true, consumer);} }我們發送sms的RoutingKey,發現結果:只有指定短信的消費者1收到消息了
?
⑤Topics 通配符模式(交換機類型:topics)
Topics模型示意圖:
?每個消費者監聽自己的隊列,并且設置帶統配符的routingkey,生產者將消息發給broker,由交換機根據routingkey來轉發消息到指定的隊列。
Routingkey一般都是有一個或者多個單詞組成,多個單詞之間以“.”分割,例如:inform.sms
通配符規則:
#:匹配一個或多個詞
*:匹配不多不少恰好1個詞
舉例:
audit.#:能夠匹配audit.irs.corporate?或者?audit.irs
audit.*:只能匹配audit.irs
從示意圖可知,我們將發送所有描述動物的消息。消息將使用由三個字(兩個點)組成的Routing key發送。路由關鍵字中的第一個單詞將描述速度,第二個顏色和第三個種類:“<speed>.<color>.<species>”。
我們創建了三個綁定:Q1綁定了“*.orange.*”,Q2綁定了“.*.*.rabbit”和“lazy.#”。
Q1匹配所有的橙色動物。
Q2匹配關于兔子以及懶惰動物的消息。
?下面做個小練習,假如生產者發送如下消息,會進入哪個隊列:
quick.orange.rabbit? ? ? ?Q1 Q2? ?routingKey="quick.orange.rabbit"的消息會同時路由到Q1與Q2
lazy.orange.elephant? ? Q1 Q2
quick.orange.fox? ? ? ? ? ?Q1
lazy.pink.rabbit? ? ? ? ? ? ? Q2? (值得注意的是,雖然這個routingKey與Q2的兩個bindingKey都匹配,但是只會投遞Q2一次)
quick.brown.fox? ? ? ? ? ? 不匹配任意隊列,被丟棄
quick.orange.male.rabbit? ?不匹配任意隊列,被丟棄
orange? ? ? ? ?不匹配任意隊列,被丟棄
下面我們以指定Routing key="quick.orange.rabbit"為例,驗證上面的答案
生產者
public class Send {private final static String EXCHANGE_NAME = "test_topic_exchange";public static void main(String[] argv) throws Exception {// 獲取到連接Connection connection = ConnectionUtil.getConnection();// 獲取通道Channel channel = connection.createChannel();// 聲明exchange,指定類型為topicchannel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);// 消息內容String message = "這是一只行動迅速的橙色的兔子";// 發送消息,并且指定routing key為:quick.orange.rabbitchannel.basicPublish(EXCHANGE_NAME, "quick.orange.rabbit", null, message.getBytes());System.out.println(" [動物描述:] Sent '" + message + "'");channel.close();connection.close();} }消費者1
public class Recv {private final static String QUEUE_NAME = "topic_exchange_queue_Q1";private final static String EXCHANGE_NAME = "test_topic_exchange";public static void main(String[] argv) throws Exception {// 獲取到連接Connection connection = ConnectionUtil.getConnection();// 獲取通道Channel channel = connection.createChannel();// 聲明隊列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 綁定隊列到交換機,同時指定需要訂閱的routing key。訂閱所有的橙色動物channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "*.orange.*");// 定義隊列的消費者DefaultConsumer consumer = new DefaultConsumer(channel) {// 獲取消息,并且處理,這個方法類似事件監聽,如果有消息的時候,會被自動調用@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,byte[] body) throws IOException {// body 即消息體String msg = new String(body);System.out.println(" [消費者1] received : " + msg + "!");}};// 監聽隊列,自動ACKchannel.basicConsume(QUEUE_NAME, true, consumer);} }消費者2
public class Recv2 {private final static String QUEUE_NAME = "topic_exchange_queue_Q2";private final static String EXCHANGE_NAME = "test_topic_exchange";public static void main(String[] argv) throws Exception {// 獲取到連接Connection connection = ConnectionUtil.getConnection();// 獲取通道Channel channel = connection.createChannel();// 聲明隊列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 綁定隊列到交換機,同時指定需要訂閱的routing key。訂閱關于兔子以及懶惰動物的消息channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "*.*.rabbit");channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "lazy.#");// 定義隊列的消費者DefaultConsumer consumer = new DefaultConsumer(channel) {// 獲取消息,并且處理,這個方法類似事件監聽,如果有消息的時候,會被自動調用@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,byte[] body) throws IOException {// body 即消息體String msg = new String(body);System.out.println(" [消費者2] received : " + msg + "!");}};// 監聽隊列,自動ACKchannel.basicConsume(QUEUE_NAME, true, consumer);} }結果C1、C2是都接收到消息了:
?
?⑥RPC
RPC模型示意圖:
基本概念:
Callback queue 回調隊列,客戶端向服務器發送請求,服務器端處理請求后,將其處理結果保存在一個存儲體中。而客戶端為了獲得處理結果,那么客戶在向服務器發送請求時,同時發送一個回調隊列地址reply_to。
Correlation id 關聯標識,客戶端可能會發送多個請求給服務器,當服務器處理完后,客戶端無法辨別在回調隊列中的響應具體和那個請求時對應的。為了處理這種情況,客戶端在發送每個請求時,同時會附帶一個獨有correlation_id屬性,這樣客戶端在回調隊列中根據correlation_id字段的值就可以分辨此響應屬于哪個請求。
流程說明:
- 當客戶端啟動的時候,它創建一個匿名獨享的回調隊列。
- 在 RPC 請求中,客戶端發送帶有兩個屬性的消息:一個是設置回調隊列的 reply_to 屬性,另一個是設置唯一值的 correlation_id 屬性。
- 將請求發送到一個 rpc_queue 隊列中。
- 服務器等待請求發送到這個隊列中來。當請求出現的時候,它執行他的工作并且將帶有執行結果的消息發送給 reply_to 字段指定的隊列。
- 客戶端等待回調隊列里的數據。當有消息出現的時候,它會檢查 correlation_id 屬性。如果此屬性的值與請求匹配,將它返回給應用
分享兩道面試題:
面試題:
避免消息堆積?
1) 采用workqueue,多個消費者監聽同一隊列。
2)接收到消息以后,而是通過線程池,異步消費。
如何避免消息丟失?
1) 消費者的ACK機制。可以防止消費者丟失消息。
但是,如果在消費者消費之前,MQ就宕機了,消息就沒了?
2)可以將消息進行持久化。要將消息持久化,前提是:隊列、Exchange都持久化
交換機持久化
隊列持久化
消息持久化
?
?Spring整合RibbitMQ
下面還是模擬注冊服務當用戶注冊成功后,向短信和郵件服務推送消息的場景
搭建SpringBoot環境
創建兩個工程 mq-rabbitmq-producer和mq-rabbitmq-consumer,分別配置1、2、3(第三步本例消費者用注解形式,可以不用配)
1、添加AMQP的啟動器:
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency><groupId>org.springframework.boot</groupId><artifactId>spring‐boot‐starter‐test</artifactId> </dependency>2、在application.yml中添加RabbitMQ的配置:?
server:port: 10086 spring:application:name: mq-rabbitmq-producerrabbitmq:host: 192.168.1.103port: 5672username: kavitopassword: 123456virtualHost: /kavitotemplate:retry:enabled: trueinitial-interval: 10000msmax-interval: 300000msmultiplier: 2exchange: topic.exchangepublisher-confirms: true屬性說明:?
-
template:有關AmqpTemplate的配置
-
retry:失敗重試
-
enabled:開啟失敗重試
-
initial-interval:第一次重試的間隔時長
-
max-interval:最長重試間隔,超過這個間隔將不再重試
-
multiplier:下次重試間隔的倍數,此處是2即下次重試間隔是上次的2倍
-
-
exchange:缺省的交換機名稱,此處配置后,發送消息如果不指定交換機就會使用這個
-
-
publisher-confirms:生產者確認機制,確保消息會正確發送,如果發送失敗會有錯誤回執,從而觸發重試
當然如果consumer只是接收消息而不發送,就不用配置template相關內容。 ?
?3、定義RabbitConfig配置類,配置Exchange、Queue、及綁定交換機。
@Configuration public class RabbitmqConfig {public static final String QUEUE_EMAIL = "queue_email";//email隊列public static final String QUEUE_SMS = "queue_sms";//sms隊列public static final String EXCHANGE_NAME="topic.exchange";//topics類型交換機public static final String ROUTINGKEY_EMAIL="topic.#.email.#";public static final String ROUTINGKEY_SMS="topic.#.sms.#";//聲明交換機@Bean(EXCHANGE_NAME)public Exchange exchange(){//durable(true) 持久化,mq重啟之后交換機還在return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();}//聲明email隊列/** new Queue(QUEUE_EMAIL,true,false,false)* durable="true" 持久化 rabbitmq重啟的時候不需要創建新的隊列* auto-delete 表示消息隊列沒有在使用時將被自動刪除 默認是false* exclusive 表示該消息隊列是否只在當前connection生效,默認是false*/@Bean(QUEUE_EMAIL)public Queue emailQueue(){return new Queue(QUEUE_EMAIL);}//聲明sms隊列@Bean(QUEUE_SMS)public Queue smsQueue(){return new Queue(QUEUE_SMS);}//ROUTINGKEY_EMAIL隊列綁定交換機,指定routingKey@Beanpublic Binding bindingEmail(@Qualifier(QUEUE_EMAIL) Queue queue,@Qualifier(EXCHANGE_NAME) Exchange exchange){return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_EMAIL).noargs();}//ROUTINGKEY_SMS隊列綁定交換機,指定routingKey@Beanpublic Binding bindingSMS(@Qualifier(QUEUE_SMS) Queue queue,@Qualifier(EXCHANGE_NAME) Exchange exchange){return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_SMS).noargs();}}生產者(mq-rabbitmq-producer)
為了方便測試,我直接把生產者代碼放工程測試類:發送routing key是"topic.sms.email"的消息,那么mq-rabbitmq-consumer下那些監聽的(與交換機(topic.exchange)綁定,并且訂閱的routingkey中匹配了"topic.sms.email"規則的) 隊列就會收到消息。
@SpringBootTest @RunWith(SpringRunner.class) public class Send {@AutowiredRabbitTemplate rabbitTemplate;@Testpublic void sendMsgByTopics(){/*** 參數:* 1、交換機名稱* 2、routingKey* 3、消息內容*/for (int i=0;i<5;i++){String message = "恭喜您,注冊成功!userid="+i;rabbitTemplate.convertAndSend(RabbitmqConfig.EXCHANGE_NAME,"topic.sms.email",message);System.out.println(" [x] Sent '" + message + "'");}} }運行測試類發送5條消息:?
web管理界面:?可以看到已經創建了交換機以及queue_email、queue_sms 2個隊列,并且向這兩個隊列分別發送了5條消息
?
?
消費者(mq-rabbitmq-consumer)
編寫一個監聽器組件,通過注解配置消費者隊列,以及隊列與交換機之間綁定關系。(也可以像生產者那樣通過配置類配置)
在SpringAmqp中,對消息的消費者進行了封裝和抽象。一個JavaBean的方法,只要添加@RabbitListener注解,就可以成為了一個消費者。
@Component public class ReceiveHandler {//監聽郵件隊列@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "queue_email", durable = "true"),exchange = @Exchange(value = "topic.exchange",ignoreDeclarationExceptions = "true",type = ExchangeTypes.TOPIC),key = {"topic.#.email.#","email.*"}))public void rece_email(String msg){System.out.println(" [郵件服務] received : " + msg + "!");}//監聽短信隊列@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "queue_sms", durable = "true"),exchange = @Exchange(value = "topic.exchange",ignoreDeclarationExceptions = "true",type = ExchangeTypes.TOPIC),key = {"topic.#.sms.#"}))public void rece_sms(String msg){System.out.println(" [短信服務] received : " + msg + "!");} }屬性說明:?
-
@Componet:類上的注解,注冊到Spring容器
-
@RabbitListener:方法上的注解,聲明這個方法是一個消費者方法,需要指定下面的屬性:
-
bindings:指定綁定關系,可以有多個。值是@QueueBinding的數組。@QueueBinding包含下面屬性:
-
value:這個消費者關聯的隊列。值是@Queue,代表一個隊列
-
exchange:隊列所綁定的交換機,值是@Exchange類型
-
key:隊列和交換機綁定的RoutingKey,可指定多個
-
-
啟動mq-rabbitmq-comsumer項目
ok,郵件服務和短息服務接收到消息后,就可以各自開展自己的業務了。
總結
以上是生活随笔為你收集整理的rabbitmq详细入门文档+springboot结合使用的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 地牢房间迷宫走廊生成(二),Python
- 下一篇: C++学习之路 | PTA乙级—— 10