RabbitMQ-从基础到实战(3)— 消息的交换(上)
轉載請注明出處
0.目錄
RabbitMQ-從基礎到實戰(1)— Hello RabbitMQ
RabbitMQ-從基礎到實戰(2)— 防止消息丟失
RabbitMQ-從基礎到實戰(4)— 消息的交換(中)
RabbitMQ-從基礎到實戰(5)— 消息的交換(下)
RabbitMQ-從基礎到實戰(6)— 與Spring集成
1.簡介
在前面的例子中,每個消息都只對應一個消費者,即使有多個消費者在線,也只會有一個消費者接收并處理一條消息,這是消息中間件的一種常用方式。
另外一種方式,生產者生產一條消息,廣播給一個或多個隊列,所有訂閱了這個隊列的消費者,都可以消費這條消息,這就是消息訂閱。
官方教程列舉了這樣一個場景,生產者發出一條記錄日志的消息,消費者1接收到后寫日志到硬盤,消費者2接收到后打印日志到屏幕。工作中還有很多這樣的場景有待發掘,適當的使用消息訂閱后可以成倍的增加效率。
2.RabbitMQ的交換中心(Exchange)
在前兩章的例子中,我們涉及到了三個概念
這不禁讓我們以為,生產者生產消息后直接到發送到隊列,消費者從隊列中獲取消息,再消費掉。
其實這是錯誤的,在RabbitMQ中,生產者不會直接把消息發送給隊列,實際上,生產者甚至不知道一條消息會不會被發送到隊列上。
正確的概念是,生產者會把消息發送給RabbitMQ的交換中心(Exchange),Exchange的一側是生產者,另一側則是一個或多個隊列,由Exchange決定一條消息的生命周期--發送給某些隊列,或者直接丟棄掉。
這個概念在官方文檔中被稱作RabbitMQ消息模型的核心思想(core idea)
如下圖,其中X代表的是Exchange。
RabbitMQ中,有4種類型的Exchange
- direct??? 通過消息的routing key比較queue的key,相等則發給該queue,常用于相同應用多實例之間的任務分發
- 默認類型?? 本身是一個direct類型的exchange,routing key自動設置為queue name。注意,direct不等于默認類型,默認類型是在queue沒有指定exchange時的默認處理方式,發消息時,exchange字段也要相應的填成空字符串“”
- topic??? 話題,通過可配置的規則分發給綁定在該exchange上的隊列,通過地理位置推送等場景適用
- headers??? 當分發規則很復雜,用routing key不好表達時適用,忽略routing key,用header取代之,header可以為非字符串,例如Integer或者String
- fanout??? 分發給所有綁定到該exchange上的隊列,忽略routing key,適用于MMO游戲、廣播、群聊等場景
更詳細的介紹,請看官方文檔
3.臨時隊列
可以對一個隊列命名是十分重要的,在消費者消費消息時,要指明消費哪個隊列的消息(下面的queue),這樣就可以讓多個消費者同時分享一個隊列
String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException;
上述記錄日志的場景中,有以下幾個特點
- 所有消費者都需要監聽所有的日志消息,因此每個消費者都需要一個單獨的隊列,不需要和別人分享
- 消費者只關心最新的消息,連接到RabbitMQ之前的消息不需要關心,因此,每次連接時需要創建一個隊列,綁定到相應的exchange上,連接斷開后,刪除該隊列
自己聲明隊列是比較麻煩的,因此,RabbitMQ提供了簡便的獲取臨時隊列的方法,該隊列會在連接斷開后銷毀
String queueName = channel.queueDeclare().getQueue();這行代碼會獲取一個名字類似于“amq.gen-JzTY20BRgKO-HjmUJj0wLg”的臨時隊列
4.綁定
再次注意,在RabbitMQ中,消息是發送到Exchange的,不是直接發送的Queue。因此,需要把Queue和Exchange進行綁定,告訴RabbitMQ把指定的Exchange上的消息發送的這個隊列上來
綁定隊列使用此方法
Queue.BindOk queueBind(String queue, String exchange, String routingKey) throws IOException;
其中,queue是隊列名,exchange是要綁定的交換中心,routingKey就是這個queue的routingKey
5.實踐
下面來實現上述場景,生產者發送日志消息,消費者1記錄日志,消費者2打印日志
下面的代碼中,把連接工廠等方法放到了構造函數中,也就是說,每new一個對象,都會創建一個連接,在生產環境這樣做是很浪費性能的,每次創建一個connection都會建立一次TCP連接,生產環境應使用連接池。而Channel又不一樣,多個Channel是共用一個TCP連接的,因此可以放心的獲取Channel(本結論出自官方文檔對Channel的定義)
AMQP 0-9-1 connections are multiplexed with?channels?that can be thought of as "lightweight connections that share a single TCP connection".
For applications that use multiple threads/processes for processing, it is very common to open a new channel per thread/process and not share channels between them.
日志消息發送類 LogSender
1 import java.io.IOException; 2 import java.util.concurrent.TimeoutException; 3 4 import org.slf4j.Logger; 5 import org.slf4j.LoggerFactory; 6 7 import com.rabbitmq.client.Channel; 8 import com.rabbitmq.client.Connection; 9 import com.rabbitmq.client.ConnectionFactory; 10 11 public class LogSender { 12 13 private Logger logger = LoggerFactory.getLogger(LogSender.class); 14 private ConnectionFactory factory; 15 private Connection connection; 16 private Channel channel; 17 18 /** 19 * 在構造函數中獲取連接 20 */ 21 public LogSender(){ 22 super(); 23 try { 24 factory = new ConnectionFactory(); 25 factory.setHost("127.0.0.1"); 26 connection = factory.newConnection(); 27 channel = connection.createChannel(); 28 } catch (Exception e) { 29 logger.error(" [X] INIT ERROR!",e); 30 } 31 } 32 /** 33 * 提供個關閉方法,現在并沒有什么卵用 34 * @return 35 */ 36 public boolean closeAll(){ 37 try { 38 this.channel.close(); 39 this.connection.close(); 40 } catch (IOException | TimeoutException e) { 41 logger.error(" [X] CLOSE ERROR!",e); 42 return false; 43 } 44 return true; 45 } 46 47 /** 48 * 我們更加關心的業務方法 49 * @param message 50 */ 51 public void sendMessage(String message) { 52 try { 53 //聲明一個exchange,命名為logs,類型為fanout 54 channel.exchangeDeclare("logs", "fanout"); 55 //exchange是logs,表示發送到此Exchange上 56 //fanout類型的exchange,忽略routingKey,所以第二個參數為空 57 channel.basicPublish("logs", "", null, message.getBytes()); 58 logger.debug(" [D] message sent:"+message); 59 } catch (IOException e) { 60 e.printStackTrace(); 61 } 62 } 63 }在LogSender中,和之前的例子不一樣的地方是,我們沒有直接聲明一個Queue,取而代之的是聲明了一個exchange
發布消息時,第一個參數填了我們聲明的exchange名字,routingKey留空,因為fanout類型忽略它。
在前面的例子中,我們routingKey填的是隊列名,因為默認的exchange(exchange位空字符串時使用默認交換中心)會把隊列的routingKey設置為queueName(聲明隊列的時候設置的,不是發送消息的時候),又是direct類型,所以可以通過queueName當做routingKey找到隊列。
消費類 LogConsumer
1 package com.liyang.ticktock.rabbitmq; 2 3 import java.io.IOException; 4 import java.util.concurrent.TimeoutException; 5 6 import org.slf4j.Logger; 7 import org.slf4j.LoggerFactory; 8 9 import com.rabbitmq.client.AMQP; 10 import com.rabbitmq.client.Channel; 11 import com.rabbitmq.client.Connection; 12 import com.rabbitmq.client.ConnectionFactory; 13 import com.rabbitmq.client.Consumer; 14 import com.rabbitmq.client.DefaultConsumer; 15 import com.rabbitmq.client.Envelope; 16 17 public class LogConsumer { 18 19 private Logger logger = LoggerFactory.getLogger(LogConsumer.class); 20 private ConnectionFactory factory; 21 private Connection connection; 22 private Channel channel; 23 24 /** 25 * 在構造函數中獲取連接 26 */ 27 public LogConsumer() { 28 super(); 29 try { 30 factory = new ConnectionFactory(); 31 factory.setHost("127.0.0.1"); 32 connection = factory.newConnection(); 33 channel = connection.createChannel(); 34 // 聲明exchange,防止生產者沒啟動,exchange不存在 35 channel.exchangeDeclare("logs","fanout"); 36 } catch (Exception e) { 37 logger.error(" [X] INIT ERROR!", e); 38 } 39 } 40 41 /** 42 * 提供個關閉方法,現在并沒有什么卵用 43 * 44 * @return 45 */ 46 public boolean closeAll() { 47 try { 48 this.channel.close(); 49 this.connection.close(); 50 } catch (IOException | TimeoutException e) { 51 logger.error(" [X] CLOSE ERROR!", e); 52 return false; 53 } 54 return true; 55 } 56 57 /** 58 * 我們更加關心的業務方法 59 */ 60 public void consume() { 61 try { 62 // 獲取一個臨時隊列 63 String queueName = channel.queueDeclare().getQueue(); 64 // 把剛剛獲取的隊列綁定到logs這個交換中心上,fanout類型忽略routingKey,所以第三個參數為空 65 channel.queueBind(queueName, "logs", ""); 66 //定義一個Consumer,消費Log消息 67 Consumer consumer = new DefaultConsumer(channel) { 68 @Override 69 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, 70 byte[] body) throws IOException { 71 String message = new String(body, "UTF-8"); 72 logger.debug(" [D] 我是來打印日志的:"+message); 73 } 74 }; 75 //這里自動確認為true,接收到消息后該消息就銷毀了 76 channel.basicConsume(queueName, true, consumer); 77 } catch (IOException e) { 78 e.printStackTrace(); 79 } 80 } 81 }復制一個項目,把72行改為如下代碼,代表兩個做不同工作的消費者
1 logger.debug(" [D] 我已經把消息寫到硬盤了:"+message);消費者App
1 public class App 2 { 3 public static void main( String[] args ) 4 { 5 LogConsumer consumer = new LogConsumer(); 6 consumer.consume(); 7 } 8 }生產者App
1 public class App { 2 public static void main( String[] args ) throws InterruptedException{ 3 LogSender sender = new LogSender(); 4 while(true){ 5 sender.sendMessage(System.nanoTime()+""); 6 Thread.sleep(1000); 7 } 8 } 9 }把消費者打包成兩個可執行的jar包,方便觀察控制臺
用java -jar 命令執行,結果如下
6.結束語
本章介紹了RabbitMQ中消息的交換,再次強調,RabbitMQ中,消息是通過交換中心轉發到隊列的,不要被默認的exchange混淆,默認的exchange會自動把queue的名字設置為它的routingKey,所以消息發布時,才能通過queueName找到該隊列,其實此時queueName扮演的角色就是routingKey。
本教程是參考官方文檔寫出來的,后續章節會介紹更多RabbitMQ的相關知識以及項目中的實戰技巧
轉載于:https://www.cnblogs.com/4----/p/6549865.html
總結
以上是生活随笔為你收集整理的RabbitMQ-从基础到实战(3)— 消息的交换(上)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: iOS开源项目周报0316
- 下一篇: javascript-基本数据类型和转换