RabbitMQ核心模式
文章目錄
- 工作原理圖
- 環境搭建
- Hello World(簡單模式)
- 生產者
- 消費者
- Work Queues(工作隊列模式)
- 消息應答
- 自動應答
- 手動應答
- RabbitMQ 持久化
- 隊列持久化
- 消息持久化
- 不公平分發
- prefetch預取值
- Publisher Confirms(發布確認模式)
- Publish/Subscribe(發布訂閱模式)
- 交換機(Exchanges)
- 無名exchange:
- 綁定(bindings)
- 系統默認exchange 類型
- Fanout 介紹
- Direct exchange
- Topics(主題模式)
- Topics的routing_key要求
工作原理圖
環境搭建
先創建用戶,然后給用戶分配角色和權限,權限是相對于虛擬主機的(一個MQ服務器實體可以對應多個虛擬主機),可以先查看該服務器下有多少個虛擬主機
Hello World(簡單模式)
用 Java 編寫兩個程序。發送單個消息的生產者和接收消息并打印出來的消費者。在下圖中,“ P”是我們的生產者,“ C”是我們的消費者。中間的框是一個隊列-RabbitMQ 代表使用者保留的消息緩沖區
生產者
package cn.com.mq.helloworld;import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory;/*** @author pangjian* @ClassName Producer* @Description 簡單模式下,生產者發送消息* @date 2022/8/6 11:02*/public class Producer {private final static String QUEUE_NAME = "hello";public static void main(String[] args) throws Exception {// 創建一個連接工廠ConnectionFactory factory = new ConnectionFactory();factory.setHost("127.0.0.1");factory.setUsername("admin");factory.setPassword("admin");// channel 實現了自動 close 接口 自動關閉 不需要顯示關閉// 創建連接Connection connection = factory.newConnection();// 獲取信道Channel channel = connection.createChannel();/*** 生成一個隊列* 1.隊列名稱* 2.隊列里面的消息是否持久化 也就是是否用完就刪除* 3.該隊列是否只供一個消費者進行消費 是否進行共享 true 可以多個消費者消費* 4.是否自動刪除 最后一個消費者端開連接以后 該隊列是否自動刪除 true 自動刪除* 5.其他參數*/channel.queueDeclare(QUEUE_NAME, false, false, false, null);String message = "hello world";/*** 發送一個消息* 1.發送到那個交換機,簡單模式沒有涉及到交換機使用,為“”* 2.路由的 key 是哪個,也就是發送到哪一個隊列* 3.其他的參數信息* 4.發送消息的消息體*/channel.basicPublish("", QUEUE_NAME, null, message.getBytes());System.out.println("消息發送完畢");}}方法執行了兩次
可以管理界面看到隊列名稱、消息狀態和消息總數
消費者
package cn.com.mq.helloworld;import com.rabbitmq.client.*;/*** @author pangjian* @ClassName Consumer* @Description 消費者* @date 2022/8/6 11:29*/public class Consumer {private final static String QUEUE_NAME = "hello";public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("127.0.0.1");factory.setUsername("admin");factory.setPassword("admin");Connection connection = factory.newConnection();Channel channel = connection.createChannel();System.out.println("等待接收消息.........");// 推送的消息如何進行消費的接口回調,可以寫拿到數據后的業務了,這里只是簡單打印DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody());System.out.println(message);};// 取消消費的一個回調接口 如:在消費的時候隊列被刪除掉了CancelCallback cancelCallback = (consumerTag) -> {System.out.println("消息消費被中斷");};/*** 消費者消費消息 - 接受消息* 1.消費哪個隊列* 2.消費成功之后是否要自動應答 true 代表自動應答 false 手動應答* 3.消費者成功消費的回調* 4.消息被取消時的回調*/channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);}}
消費完畢
Work Queues(工作隊列模式)
工作隊列(又稱:任務隊列——Task Queues)是為了避免等待一些占用大量資源、時間的操作(例如圖片縮放、pdf文件轉換)。當我們把任務(Task)當作消息發送到隊列中,一個運行在后臺的工作者(worker)進程就會取出任務然后處理。當你運行多個工作者(workers),任務就會在它們之間輪詢分發消息任務,讓消息不堆積。
import cn.com.mq.util.RabbitMqUtils; import com.rabbitmq.client.*;/*** @author pangjian* @ClassName Work* @Description 工作進程* @date 2022/8/6 11:51*/public class Work {private final static String QUEUE_NAME = "workQueues";public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();System.out.println("等待接收消息.........");// w2可以更改,讓每一個進程得以標識DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody());System.out.println("w2處理:" + message);};// 取消消費的一個回調接口 如:在消費的時候隊列被刪除掉了CancelCallback cancelCallback = (consumerTag) -> {System.out.println("消息消費被中斷");};/*** 消費者消費消息 - 接受消息* 1.消費哪個隊列* 2.消費成功之后是否要自動應答 true 代表自動應答 false 手動應答* 3.消費者成功消費的回調* 4.消息被取消時的回調*/channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);}}更改啟動參數后(讓原理進程保留不被重新啟動),啟動兩次該Work類的main方法,映射兩個執行進程
public class Producer {private final static String QUEUE_NAME = "workQueues";public static void main(String[] args) throws Exception {// 創建一個連接工廠Channel channel = RabbitMqUtils.getChannel();/*** 生成一個隊列* 1.隊列名稱* 2.隊列里面的消息是否持久化 也就是是否用完就刪除* 3.該隊列是否只供一個消費者進行消費 是否進行共享 true 可以多個消費者消費* 4.是否自動刪除 最后一個消費者端開連接以后 該隊列是否自動刪除 true 自動刪除* 5.其他參數*/channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 向消息隊列發送10條記錄for (int i = 0; i < 10; i++) {String message = "hello world" + i;/*** 發送一個消息* 1.發送到那個交換機,簡單模式沒有涉及到交換機使用,為“”* 2.路由的 key 是哪個,也就是發送到哪一個隊列* 3.其他的參數信息* 4.發送消息的消息體*/channel.basicPublish("", QUEUE_NAME, null, message.getBytes());}System.out.println("消息發送完畢");}}
剛好每個工作進程分配到5個消息,并且是順序,這種分發稱為輪詢
消息應答
如果消費者處理一個長的任務并僅只完成了部分突然它掛掉了,RabbitMQ 一旦向消費者傳遞了一條消息,便立即將該消息標記為刪除。在這種情況下,突然有個消費者掛掉了,我們將丟失正在處理的消息。以及后續發送給該消費這的消息,因為它無法接收到。為了保證消息在發送過程中不丟失,rabbitmq 引入消息應答機制,消息應答就是:消費者在接收到消息并且處理該消息之后,告訴 rabbitmq 它已經處理了,rabbitmq 可以把該消息刪除了。
自動應答
消息發送后立即被認為已經傳送成功,但如果消費者那邊出現連接或者 channel 關閉,那么消息就丟失了,當然這種模式消費者那邊可以沒有對傳遞的消息數量進行限制,但是這樣有可能使得消費者這邊由于接收太多還來不及處理的消息,導致這些消息的積壓,最終使得內存耗盡,最終這些消費者線程被操作系統殺死,所以這種模式僅適用在消費者可以高效并以某種速率能夠處理這些消息的情況下使用。
手動應答
為了保證消息不會被丟失,RabbitMQ提供了消息手動響應(acknowledgments)。消費者會通過一個ack(響應),告訴RabbitMQ已經收到并處理了某條消息,然后RabbitMQ才會釋放并刪除這條消息。如果消費者(consumer)掛掉了,沒有發送響應,RabbitMQ就會認為消息沒有被完全處理,然后重新發送給其他消費者(consumer)。這樣,即使工作者(workers)偶爾的掛掉,也不會丟失消息。
生產者先發送8條消息進mq隊列,然后兩個消費者去完成該8條消息處理
import cn.com.mq.util.RabbitMqUtils; import com.rabbitmq.client.CancelCallback; import com.rabbitmq.client.Channel; import com.rabbitmq.client.DeliverCallback;/*** @author pangjian* @ClassName Work* @Description 手動應答ack工作進程* @date 2022/8/6 11:51*/public class AckWork {private final static String QUEUE_NAME = "ackQueues";public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();System.out.println("等待接收消息.........");// w2可以更改,讓每一個進程得以標識DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody());System.out.println("w1處理:" + message + "未完成");// 模擬長消息任務處理中可能會掛掉的場景try {Thread.sleep(3000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("w1處理:" + message + "已完成");/*** 1.消息標記 tag* 2.是否批量應答未應答消息,false表示不批量,一個消息一個確認,批量類似計網移動窗口,2345只要5確認就代表234也得到確認了*/channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);};// 取消消費的一個回調接口 如:在消費的時候隊列被刪除掉了CancelCallback cancelCallback = (consumerTag) -> {System.out.println("消息消費被中斷");};/*** 消費者消費消息 - 接受消息* 1.消費哪個隊列* 2.消費成功之后是否要自動應答 true 代表自動應答 false 手動應答* 3.消費者成功消費的回調* 4.消息被取消時的回調*/channel.basicConsume(QUEUE_NAME, false, deliverCallback, cancelCallback);}}
w2接收到消息任務4,沒處理完成宕機了,消息并沒有丟失,而是重新分發給了w1處理,這就是手動響應,消息不會丟失
RabbitMQ 持久化
默認情況下 RabbitMQ 退出或由于某種原因崩潰時,它忽視隊列和消息,除非告知它不要這樣做。確保消息不會丟失需要做兩件事:隊列和消息都標記為持久化。
隊列持久化
- 生產者代碼
持久化后的隊列
消息持久化
channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());將消息標記為持久化并不能完全保證不會丟失消息。盡管它告訴 RabbitMQ 將消息保存到磁盤,但是這里依然存在當消息剛準備存儲在磁盤的時候 但是還沒有存儲完,消息還在緩存的一個間隔點。此時并沒 有真正寫入磁盤。持久性保證并不強,但是對于我們的簡單任務隊列而言,這已經綽綽有余了。如果需要更強有力的持久化策略,參考后邊課件發布確認章節。
不公平分發
在最開始的時候我們學習到 RabbitMQ 分發消息采用的輪訓分發,但是在某種場景下這種策略并不是很好,比方說有兩個消費者在處理任務,其中有個消費者 1 處理任務的速度非常快,而另外一個消費者 2 處理速度卻很慢,這個時候我們還是采用輪訓分發的化就會到這處理速度快的這個消費者很大一部分時間處于空閑狀態,而處理慢的那個消費者一直在干活,這種分配方式在這種情況下其實就不太好,但是 RabbitMQ 并不知道這種情況它依然很公平的進行分發。
- 每一個消費者的代碼,設置優先級
這樣處理快的消費者會被分發到更多的消息
prefetch預取值
我們設置預取計數值為1。就是告訴RabbitMQ一次只向一個worker發送一條消息(如果值為5,則5條消息會積壓在信道中,等待5條消息中一條處理完成后再向信道中發送新的消息)。換句話說,在處理并確認前一個消息之前(業務未完成之前),不要向工作人員發送新消息。
Publisher Confirms(發布確認模式)
生產者將信道設置成 confirm 模式,所有在該信道上面發布的消息都將會被指派一個唯一的 ID(從 1 開始),一旦消息被投遞到所有匹配的隊列之后,broker 就會發送一個確認給生產者(包含消息的唯一 ID),這就使得生產者知道消息已經正確到達目的隊列了,如果消息和隊列是可持久化的,那么確認消息會在將消息寫入磁盤之后發出,broker 回傳給生產者的確認消息中 delivery-tag 域包含了確認消息的序列號,此外 broker 也可以設置basic.ack 的 multiple 域,表示到這個序列號之前的所有消息都已經得到了處理。
package cn.com.mq.cf;import cn.com.mq.util.RabbitMqUtils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.ConfirmCallback;import java.util.UUID; import java.util.concurrent.ConcurrentNavigableMap; import java.util.concurrent.ConcurrentSkipListMap;/*** @author pangjian* @ClassName PublisherConfirms* @Description 發布確保消息不被丟失的* @date 2022/8/6 19:21*/public class PublisherConfirms {public static final int MESSAGE_COUNT = 1000;public static void main(String[] args) throws Exception {publishMessageBatch();publishMessageIndividually();publishMessageAsync();}/*** 單個發送*/public static void publishMessageIndividually() throws Exception {Channel channel = RabbitMqUtils.getChannel();// 隊列聲明String queueName = UUID.randomUUID().toString();channel.queueDeclare(queueName, true, false, false, null);// 開啟發布確認channel.confirmSelect();long begin = System.currentTimeMillis();for (int i = 0; i < MESSAGE_COUNT; i++) {String message = i + "";channel.basicPublish("", queueName, null, message.getBytes());// 服務端返回 false 或超時時間內未返回,生產者可以消息重發boolean flag = channel.waitForConfirms();if (flag) {System.out.println("消息發送成功");}}long end = System.currentTimeMillis();System.out.println("發布" + MESSAGE_COUNT + "個單獨確認消息,耗時" + (end - begin) + "ms");}/*** 批量*/public static void publishMessageBatch() throws Exception {Channel channel = RabbitMqUtils.getChannel();// 隊列聲明String queueName = UUID.randomUUID().toString();channel.queueDeclare(queueName, true, false, false, null);// 開啟發布確認channel.confirmSelect();// 批量確認消息大小int batchSize = 100;// 未確認消息個數int outstandingMessageCount = 0;long begin = System.currentTimeMillis();for (int i = 0; i < MESSAGE_COUNT; i++) {String message = i + "";channel.basicPublish("", queueName, null, message.getBytes());outstandingMessageCount++;if (outstandingMessageCount == batchSize) {channel.waitForConfirms();outstandingMessageCount = 0;}}// 為了確保還有剩余沒有確認消息 再次確認if (outstandingMessageCount > 0) {channel.waitForConfirms();}long end = System.currentTimeMillis();System.out.println("發布" + MESSAGE_COUNT + "個批量確認消息,耗時" + (end - begin) + "ms");}public static void publishMessageAsync() throws Exception {try (Channel channel = RabbitMqUtils.getChannel()) {String queueName = UUID.randomUUID().toString();channel.queueDeclare(queueName, false, false, false, null);// 開啟發布確認channel.confirmSelect();/*** 線程安全有序的一個哈希表,適用于高并發的情況* 1.輕松的將序號與消息進行關聯* 2.輕松批量刪除條目 只要給到序列號* 3.支持并發訪問*/ConcurrentSkipListMap<Long, String> outstandingConfirms = newConcurrentSkipListMap<>();/*** 消息發布成功回調(如果設置了持久化則持久化成功才算發布成功)* 1.消息序列號* 2.true 可以確認小于等于當前序列號的消息* false 確認當前序列號消息*/ConfirmCallback ackCallback = (sequenceNumber, multiple) -> {if (multiple) {// 返回的是小于等于當前序列號的未確認消息 是一個 mapConcurrentNavigableMap<Long, String> confirmed =outstandingConfirms.headMap(sequenceNumber, true);// 清除該部分未確認消息confirmed.clear();}else{// 只清除當前序列號的消息outstandingConfirms.remove(sequenceNumber);}};/*** @Description:消息發布失敗回調* @return void* @date 2022/8/6 19:59*/ConfirmCallback nackCallback = (sequenceNumber, multiple) -> {String message = outstandingConfirms.get(sequenceNumber);System.out.println("發布的消息"+message+"未被確認,序列號"+sequenceNumber);};/*** 添加一個異步確認的監聽器,要在發送消息之前設置,才能收到所有的消息確認信息* 1.確認收到消息的回調* 2.未收到消息的回調*/channel.addConfirmListener(ackCallback, null);long begin = System.currentTimeMillis();for (int i = 0; i < MESSAGE_COUNT; i++) {String message = "消息" + i;/*** channel.getNextPublishSeqNo()獲取下一個消息的序列號* 通過序列號與消息體進行一個關聯* 全部都是未確認的消息體*/outstandingConfirms.put(channel.getNextPublishSeqNo(), message);channel.basicPublish("", queueName, null, message.getBytes());}long end = System.currentTimeMillis();System.out.println("發布" + MESSAGE_COUNT + "個異步確認消息,耗時" + (end - begin) + "ms");}}}Publish/Subscribe(發布訂閱模式)
之前我們創建了一個工作隊列,每個任務都恰好交付給一個消費者(工作進程)。在這一部分中,我們將一個消息傳達給多個消費者。這種模式 稱為 ”發布/訂閱”。
交換機(Exchanges)
RabbitMQ 消息傳遞模型的核心思想是: 生產者生產的消息從不會直接發送到隊列。通常生產者甚至都不知道這些消息傳遞傳遞到了哪些隊列中。生產者只能將消息發送到交換機(exchange),交換機工作的內容非常簡單,==一方面它接收來自生產者的消息,另一方面將它們推入隊列。==交換機必須確切知道如何處理收到的消息。是應該把這些消息放到特定隊列還是說把他們到許多隊列中還是說應該丟棄它們。這就的由交換機的類型來決定。
無名exchange:
在前面部分我們對 exchange 一無所知,但仍然能夠將消息發送到隊列。之前能實現的 原因是因為我們使用的是默認交換,我們通過空字符串(“”)進行標識。第一個參數是交換機的名稱。空字符串表示默認或無名稱交換機:消息能路由發送到隊列中其實是由 routingKey(bindingkey)綁定 key 指定的,如果它存在的話
綁定(bindings)
- binding 其實是 exchange 和 queue 之間的橋梁,它告訴我們 exchange 和那個隊列進行了綁定關系。比如說下面這張圖告訴我們的就是 X 與 Q1 和 Q2 進行了綁定
系統默認exchange 類型
Fanout 介紹
它是將接收到的所有消息廣播到它知道的所有隊列中
為了說明這種模式,我們將構建一個簡單的日志系統。它將由兩個程序組成:第一個程序將發出日志消 息,第二個程序是消費者。其中我們會啟動兩個消費者,其中一個消費者接收到消息后把日志存儲在磁盤,另外一個消費者接收到消息后把消息打印在屏幕上(一個消息被兩個消費者同時處理)
- 消費者創建一個隨機隊列和聲明一個Fanout交換機,隊列綁定Fanout類型交換機,最后消費監聽
- 生產者聲明一個Fanout交換機,往交換機發送信息
Direct exchange
之前日志系統將所有消息廣播給所有消費者,對此我們想做一些改變,例如希望將日志消息寫入磁盤的程序僅接收嚴重錯誤(errros),而不存儲哪些警告(warning)或信息(info)日志消息避免浪費磁盤空間。Fanout 這種交換類型并不能給我們帶來很大的靈活性:它只能進行無意識的廣播,在這里我們將使用 direct 這種類型來進行替換,這種類型的工作方式是,消息只去到它綁定的routingKey 隊列中去。這時要用到bindings,綁定是交換機和隊列之間的橋梁關系。也可以這么理解:
- 隊列只對它綁定的交換機的消息感興趣。
- 綁定用參數:routingKey 來表示也可稱該參數為 binding key,綁定之后的意義由其交換類型決定。
在上面這張圖中,我們可以看到 X 綁定了兩個隊列,綁定類型是 direct。隊列 disk 綁定鍵為 error,隊列 console 綁定鍵有兩個:一個綁定鍵為 info,另一個綁定鍵為 warning.在這種綁定情況下,生產者發布消息到 exchange 上,綁定鍵為 error 的消息會被發布到隊列disk。綁定鍵為 info和warning的消息會被發布到隊列 console,其他消息類型的消息將被丟棄。
package cn.com.mq.direct;import cn.com.mq.util.RabbitMqUtils; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.DeliverCallback;/*** @author pangjian* @ClassName ReceiveLogsDirect01* @Description 消費者* @date 2022/8/7 1:31*/public class ReceiveLogsDirect01 {private static final String EXCHANGE_NAME = "direct_logs";public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);String queueName = "disk";// 隊列聲明channel.queueDeclare(queueName, false, false, false, null);// 隊列綁定channel.queueBind(queueName, EXCHANGE_NAME, "error");System.out.println("等待接收消息...");// 發送回調DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");message = "接收綁定鍵:" + delivery.getEnvelope().getRoutingKey() + ",消息:" + message;System.out.println("error 消息已經接收:\n" + message);};channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});}} package cn.com.mq.direct;import cn.com.mq.util.RabbitMqUtils; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.DeliverCallback;/*** @author pangjian* @ClassName ReceiveLogsDirect02* @Description 消費者* @date 2022/8/7 1:32*/public class ReceiveLogsDirect02 {private static final String EXCHANGE_NAME = "direct_logs";public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);String queueName = "console";// 隊列聲明channel.queueDeclare(queueName, false, false, false, null);// 隊列綁定channel.queueBind(queueName, EXCHANGE_NAME, "info");channel.queueBind(queueName, EXCHANGE_NAME, "warning");System.out.println("等待接收消息...");// 發送回調DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");message = "接收綁定鍵:" + delivery.getEnvelope().getRoutingKey() + ",消息:" + message;System.out.println("info和warning 消息已經接收:\n" + message);};channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});}} package cn.com.mq.direct;import cn.com.mq.util.RabbitMqUtils; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel;import java.util.HashMap; import java.util.Map;/*** @author pangjian* @ClassName EmitLogDirect* @Description 日志消息生產者* @date 2022/8/7 1:32*/public class EmitLogDirect {private static final String EXCHANGE_NAME = "direct_logs";public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();// 聲明交換機類型channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);// 創建多個 bindingKeyMap<String, String> bindingKeyMap = new HashMap<>();bindingKeyMap.put("info", "普通 info 信息");bindingKeyMap.put("warning", "警告 warning 信息");bindingKeyMap.put("error", "錯誤 error 信息");// debug 沒有消費這接收這個消息 所有就丟失了bindingKeyMap.put("debug", "調試 debug 信息");for (Map.Entry<String, String> bindingKeyEntry : bindingKeyMap.entrySet()) {// 獲取 key valueString bindingKey = bindingKeyEntry.getKey();String message = bindingKeyEntry.getValue();// 遍歷鍵值,把對應消息發送到指定交換機,并綁定上該消息對應的bindingKeychannel.basicPublish(EXCHANGE_NAME, bindingKey, null, message.getBytes("UTF-8"));System.out.println("生產者發出消息:" + message);}}}Topics(主題模式)
之前我們改進了日志記錄系統。我們沒有使用只能進行隨意廣播的 fanout 交換機,而是使用了 direct 交換機,從而有能實現有選擇性地接收日志。盡管使用 direct 交換機改進了我們的系統,但是它仍然存在局限性——比方說我們想接收的日志類型有 info.base 和 info.advantage,某個隊列只想 info.base 的消息,那這個時候direct 就辦不到了。這個時候就只能使用 topic 類型
Topics的routing_key要求
發送到類型是 topic 交換機的消息的 routing_key 不能隨意寫,必須滿足一定的要求,它必須是一個單
詞列表,以點號分隔開。這些單詞可以是任意單詞,比如說:“stock.usd.nyse”, “nyse.vmw”,
“quick.orange.rabbit”.這種類型的。在這個規則列表中,其中有兩個替換符:
- *(星號)可以代替一個單詞
- #(井號)可以替代零個或多個單詞
- 中間帶 orange 帶 3 個單詞的字符串*.orange.*
- 最后一個單詞是 rabbit 的 3 個單詞*.*.rabbit
- 第一個單詞是 lazy 的多個單詞lazy.#
| quick.orange.rabbit | 被隊列 Q1Q2 接收到 |
| azy.orange.elephant | 被隊列 Q1Q2 接收到 |
| quick.orange.fox | 被隊列 Q1 接收到 |
| lazy.brown.fox | 被隊列 Q2 接收到 |
| lazy.pink.rabbit | 雖然滿足兩個綁定但只被隊列 Q2 接收一次 |
| quick.brown.fox | 不匹配任何綁定不會被任何隊列接收到會被丟棄 |
| quick.orange.male.rabbit | 是四個單詞不匹配任何綁定會被丟棄 |
| lazy.orange.male.rabbit | 是四個單詞但匹配 Q2 |
- 當一個隊列綁定鍵只是#,那么這個隊列將接收所有數據,就有點像 fanout 了(什么消息都接收)
- 如果隊列綁定鍵當中沒有#和*出現,那么該隊列綁定類型就是 direct 了
運行結果為上圖所示
總結
以上是生活随笔為你收集整理的RabbitMQ核心模式的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: maven常见面试题
- 下一篇: 化工图纸中LISP_化工工艺图纸标识代号