RabbitMQ 入门系列(2)— 生产者、消费者、信道、代理、队列、交换器、路由键、绑定、交换器
本系列是「RabbitMQ實戰:高效部署分布式消息隊列」和 「RabbitMQ實戰指南」書籍的讀書筆記。
RabbitMQ 中重要概念
1. 生產者
生產者(producer)創建消息,然后發送到代理服務器(RabbitMQ Server),消息包括兩部分:有效載荷(payload)和標簽(label)。
有效載荷是要傳輸的數據,可以是任何內容,比如 JSON串、二進制、自定義的數據協議等;標簽描述了有效載荷,并且 RabbitMQ 用它來決定誰將獲得消息的投遞。
總結:生產者會創建消息并設置標簽。
2. 消費者
消費者會訂閱到隊列(queue)上,每當有消息到達 RabbitMQ 服務器時,會發送給消費者,消費者收到消息時,會進行處理。
消費者連接到 RabbitMQ 服務器,并訂閱到隊列上。當消費者消費一條消息時, 只是消費消息的消息體(payload)。在消息路由的過程中,消息的標簽會丟棄, 存入到隊列中的消息只有消息體,消費者也只會消費到消息體,也就不知道消息的生產者是誰,當然消費者也不需要知道。
3. 信道
應用程序和 RabbitMQ 建立 TCP 連接后,應用程序就可以創建一條 AMQP 信道,信道是建立在真實的 TCP 連接內的虛擬連接,AMQP 命令都是通過信道發出去的。每條信道都會被指派成唯一的一個 ID。
無論是發布消息、訂閱隊列或者接收消息,這些動作都是通過信道完成的。為什么要通過信道而不是 TCP 連接發送 AMQP 命令呢? 原因是對于操作系統來說建立和銷毀 TCP 會話是非常昂貴的開銷,每秒成百上千次地創建信道是不會影響操作系統的,在一條 TCP 連接上創建多少條信道是沒有限制的。
4. 代理
代理(Broker): 消息中間件的服務節點。
對于 RabbitMQ 來說, 一個 RabbitMQ Broker 可以簡單地看作一個 RabbitMQ 服務節點,或者 RabbitMQ 服務實例。大多數情況下也可以將一個 RabbitMQ Broker 看作一臺 RabbitMQ 服務器。
首先生產者將業務方數據進行可能的包裝, 之后封裝成消息, 發送(AMQP 協議里這個動作對應的命令為 Basic.Publish) 到 Broker 中。消費者訂閱并接收消息(AMQP 協議里這個動作對應的命令為 Basic.Consume 或者 Basic. Get),經過可能的解包處理得到原始的數據,之后再進行業務處理邏輯。這個業務處理邏輯并不一定需要和接收消息的邏輯使用同一個線程。
消費者進程可以使用一個線程去接收消息,存入到內存中,業務處理邏輯使用另一個線程從內存中讀取數據,這樣可以將應用進一步解稿,提高整個應用的處理效率。
5. 隊列
隊列是 RabbitMQ 的內部對象,用于存儲消息。
消費者通過以下兩種方式從特定的隊列中獲取消息:
- 通過 AMQP 的 basic.consume 命令訂閱(持續訂閱)
消費者在獲取到消息并處理后,會自動地從隊列中獲取下一條信息。
- 通過 AMQP 的 basic.get 命令訂閱(單條訂閱)
消費者只會獲取單條信息,并取消訂閱,如果要循環一直獲取消息則需要使用 basic.consume 命令而不是將 basic.get 命令放到循環中處理,因為這樣會嚴重影響 RabbitMQ 的性能。
消息發送到隊列中后,如果有消費者訂閱了該隊列,那么 RabbitMQ 會立即將該消息發送到訂閱的消費者,如果沒有消費者訂閱該消息隊列,那么消息將一直在隊列中等待,直到有消費者訂閱該隊列。
多個消費者可以訂閱同一個隊列,這時隊列中的消息會被平均分攤(即這次給A 發送,下次給 B 發送)給多個消費者進行處理,而不是每個消費者都收到所有的消息井處理。
消費者接收到每一條消息都必須進行確認,消費者必須通過 AMQP 的 basic.ack 命令顯式地向 RabbitMQ 發送一個確認,或者在訂閱隊列的時候將 auto_ack 參數設置為 True。當設置了 auto_ack 后,一旦消費者接收到消息,那么 RabbitMQ 會自動視其確認了消息,消費者通過命令告訴 RabbitMQ 它已經正確接收了消息,同時 RabbitMQ 才能安全地把消息從隊列中刪除。
消費者在接收消息之后,在確認之前,如果從 RabbitMQ 斷開連接或者取消隊列,那么 RabbitMQ 會認為這條消息沒有分發,然后重新分發給下一個訂閱的消費者,確保消息被另一個消費者處理。RabbitMQ 后續將不會給當前訂閱者發送消息,因為 RabbitMQ 認為當前消費者并沒有準備好接收下一條消息。
在接收到消息后,如果發現消息格式有問題,想要明確拒絕,在消息尚未確認之前有以下兩個選擇:
- 把消費者從 RabbitMQ 斷開連接
這會導致 RabbitMQ 自動重新把消息入隊并發送給另一個消費者
- 使用 AMQP 的 basic.reject 命令
reject 命令的 requeue 參數為 True 時,RabbitMQ 會將消息發送給下一個訂閱的消費者;參數為 False 時,RabbitMQ 會立即把消息從隊列中刪除,而不會把它發送給新的消費者。
隊列設置參數:
- exclusive
該參數為 True 時,隊列將變成私有的。該參數可以用于限制一個隊列只能有一個消費者使用。
- auto-delete
最后一個消費者取消訂閱的時候,隊列就會自動移除。
嘗試聲明一個已經存在的隊列時,只要聲明參數完全匹配現存隊列的話,RabbitMQ 就什么也不做,并成功返回。
一般情況下,為了避免消息丟失,生產者和消費者都應該嘗試去創建隊列。
6. 交換器
Exchange: 交換器,它指定消息按什么規則,路由到哪個隊列。
生產者將消息發送到 Exchange (交換器,通常也可以用大寫的"X" 來表示),由交換器將消息路由到一個或者多個隊列中。如果路由不到,或許會返回給生產者,或許直接丟棄。
一個 Exchange 可以 binding 多個 Queue,一個 Queue 可以同多個 Exchange 進行 binding。交換器與隊列之間的關系是多對多的。
交換器的具體示意圖:
7. 路由鍵
RoutingKey : 路由鍵。用于把生產者的數據分配到交換器上,Exchange 根據這個關鍵字進行消息投遞。
生產者將消息發給交換器的時候,一般會指定一個 RoutingKey ,用來指定這個消息的路由規則,而這個 RoutingKey 需要與交換器類型和綁定鍵 (BindingKey) 聯合使用才能最終生效。
在交換器類型和綁定鍵(BindingKey)固定的情況下,生產者可以在發送消息給交換器時,通過指定 RoutingKey 來決定消息流向哪里。
8. 綁定鍵
BindingKey: 綁定,用于把交換器的消息綁定到隊列上,它的作用就是把 Exchange 和 Queue 按照路由規則綁定起來。
RabbitMQ 中通過綁定將交換器與隊列關聯起來,在綁定的時候一般會指定一個綁定鍵( BindingKey ) ,這樣 RabbitMQ 就知道如何正確地將消息路由到隊列了。
如下代碼的 routing_key=‘world’ 其實就是綁定鍵。
channel.queue_bind(queue='hello', exchange='hello', routing_key='world')
綁定鍵示意圖
生產者將消息發送給交換器時, 需要一個 RoutingKey , 當 BindingKey 和 RoutingKey 相匹配時,消息會被路由到對應的隊列中。在綁定多個隊列到同一個交換器的時候, 這些綁定允許使用相同的 BindingKey 。
BindingKey 并不是在所有的情況下都生效,它依賴于交換器類型, 比如 fanout 類型的交換器就會無視 BindingKey ,而是將消息路由到所有綁定到該交換器的隊列中。
在direct 交換器類型下,RoutingKey 和BindingKey 需要完全匹配才能使用;但是在topic 交換器類型下, RoutingKey 和 BindingKey 之間需要做模糊匹配,兩者并不是相同的。
BindingKey 其實也屬于路由鍵中的一種,官方解釋為: the routing key to use for the binding。
可以翻譯為:在綁定的時候使用的路由鍵。可以這么理解:
-
在使用綁定的時候,其中需要的路由鍵是 BindingKey,涉及的客戶端方法如:
channel.exchangeBind 、channel .queueBind ,對應的AMQP 命令為Exchange.Bind 、Queue.Bind 。 -
在發送消息的時候,其中需要的路由鍵是 RoutingKey,涉及的客戶端方法如channel.basicPublish,對應的 AMQP 命令為Basic.Publish。
9. 交換器類型
RabbitMQ 常用的交換器類型有 fanout 、direct、topic 、headers 這四種。
- fanout
它會把所有發送到該交換器的消息路由到所有與該交換器綁定的隊列中。類似廣播消息。
fanout 交換器不處理路由鍵,只是簡單的將隊列綁定到交換器上,每個發送到交換器的消息都會被轉發到與該交換器綁定的所有隊列上。
- direct
direct 類型的交換器路由規則也很簡單,它會把消息路由到那些 BindingKey 和 RoutingKey 完全匹配的隊列中。
以圖2-7 為例,交換器的類型為direct,
如果我們發送一條消息,并在發送消息的時候設置路由鍵為" warning" ,則消息會路由到 Queuel 和Queue2 。
如果在發送消息的時候設置路由鍵為 “info” 或者 “debug” ,消息只會路由到Queue2 。
如果以其他的路由鍵發送消息,則消息不會路由到這兩個隊列中。
請注意以上 Exchange 用的是虛線,為什么用虛線呢?是因為在用 Direct 模式的時候不需要指定對應的交換器,只需要指定對應的 Queue 就可以。那你可能又會問那為什么還把 Exchange 畫上去呢,原因是因為當你創建了對應的 vhost 之后 RabbitMQ 就會為我們創建對應的沒有名字的一個默認的 Exchange。
需要說明,AMQP 提供了 “默認交換器”:類型為 direct,名稱為空字符串。任何的隊列被創建時,即以隊列名稱作為綁定鍵,綁定到 “默認交換器”。
使用代碼:
channel.basicPublish("", QueueName, null, message)
推送 direct 交換器消息到對于的隊列,空字符為默認的 direct 交換器,用隊列名稱當做路由鍵。
- 持續消息獲取使用:
basic.consume; - 單個消息獲取使用:
basic.get;
當接收端訂閱者有多個的時候,direct 會輪詢公平的分發給每個訂閱者(訂閱者消息確認正常)。
- topic
與direct 類型的交換器相似,也是將消息路由到 BindingKey 和 RoutingKey 相匹配的隊列中,但這里的匹配規則有些不同,它約定:
- RoutingKey 為一個點號 ". " 分隔的字符串(被點號 “.” 分隔開的每一段獨立的字符串稱為一個單詞),如 “com.rabbitmq.client, java.util.concurrent, com.hidden.client”;
- BindingKey 和RoutingKey 一樣也是點號"."分隔的字符串;
- BindingKey 中可以存在兩種特殊字符串 * 和 # ,用于做模糊匹配,其中 * 用于匹配一個單詞, # 用于 0 個或者多個詞)。
以圖2-8 中的配置為例:
- 路由鍵為 “com.rabbitmq.client” 的消息會同時路由到Queuel 和Queue2;
- 路由鍵為 “com.hidden.client” 的消息只會路由到Queue2 中:
- 路由鍵為 “com.hidden.demo” 的消息只會路由到Queue2 中:
- 路由鍵為 “java.rabbitmq.demo” 的消息只會路由到Queuel 中:
- 路由鍵為 “java.util.concurrent” 的消息將會被丟棄或者返回給生產者(需要設置 mandatory 參數),因為它沒有匹配任何路由鍵。
- headers
headers 類型的交換器不依賴于路由鍵的匹配規則來路由消息,而是根據發送的消息內容中的 headers 屬性進行匹配。
在綁定隊列和交換器時制定一組鍵值對, 當發送消息到交換器時,RabbitMQ 會獲取到該消息的 headers (也是一個鍵值對的形式) ,對比其中的鍵值對是否完全匹配隊列和交換器綁定時指定的鍵值對,如果完全匹配則消息會路由到該隊列,否則不會路由到該隊列。
headers 類型的交換器性能會很差,而且也不實用,基本上不會看到它的存在。
10. RabbitMQ 總流程
10.1 生產者
(1)生產者連接到RabbitMQ Broker , 建立一個連接( Connection) ,開啟一個信道(Channel)
(2)生產者聲明一個交換器,并設置相關屬性,比如交換機類型、是否持久化等
(3)生產者聲明一個隊列井設置相關屬性,比如是否排他、是否持久化、是否自動刪除等
(4)生產者通過路由鍵將交換器和隊列綁定起來
(5)生產者發送消息至RabbitMQ Broker,其中包含路由鍵、交換器等信息
(6)相應的交換器根據接收到的路由鍵查找相匹配的隊列
(7)如果找到,則將從生產者發送過來的消息存入相應的隊列中
(8)如果沒有找到,則根據生產者配置的屬性選擇丟棄還是回退給生產者
(9)關閉信道
(10)關閉連接
10.2 消費者
(1)消費者連接到RabbitMQ Broker ,建立一個連接(Connection ) ,開啟一個信道(Channel)
(2)消費者向 RabbitMQ Broker 請求消費相應隊列中的消息,可能會設置相應的回調函數,以及做一些準備工作
(3)等待RabbitMQ Broker 回應并投遞相應隊列中的消息, 消費者接收消息
(4)消費者確認( ack) 接收到的消息
(5)RabbitMQ 從隊列中刪除相應己經被確認的消息
(6)關閉信道
(7)關閉連接
每個線程把持一個信道,所以信道復用了 Connection 的 TCP 連接。同時 RabbitMQ 可以確保每個線程的私密性,就像擁有獨立的連接一樣。
當每個信道的流量不是很大時,復用單一的 Connection 可以在產生性能瓶頸的情況下有效地節省 TCP 連接資源。但是當信道本身的流量很大時,這時候多個信道復用一個 Connection 就會產生性能瓶頸,進而使整體的流量被限制了。
此時就需要開辟多個 Connection ,將這些信道均攤到這些Connection 中,至于這些相關的調優策略需要根據業務自身的實際情況進行調節。
11. 虛擬主機和隔離
每一個 RabbitMQ 服務器都能創建一個消息服務器,我們稱之為虛擬主機(vhost), 每一個 vhost 實際上是一個 mini 版的 RabbitMQ 服務器,擁有自己的隊列、交換器、綁定以及自己的權限機制。
多個應用程序可以只運行一個 RabbitMQ 服務器的不同虛擬主機 vhost 上,這樣可以避免隊列和交換器的命名沖突,各個 vhost 之間邏輯是分離的。
RabbitMQ 包含了開箱即用的默認 vhost:’/’, 在連接時如果沒有指定 vhost 就會使用默認的 vhost。
vhost 之間是絕對隔離的。當在 RabbitMQ 里面創建一個用戶時,用戶通常會被指派給至少一個 vhost,并且只能訪問被指派 vhost 內的隊列、交換器和綁定。
所以多個用戶可以共用一個 RabbitMQ,其中只需要將不同的用戶使用不同的 vhost 進行分開,這樣大家就只需要搭建一個 MQ 服務器共同使用,從而互補影響。
- 創建 vhost
rabbitmqctl add_vhost [vhost_name]
- 刪除 vhost
rabbitmqctl delete_vhost [vhost_name]
- 查詢運行哪些 vhost
rabbitmqctl list_vhosts
12. 死信隊列
在某些情況下,例如當一個消息無法被成功路由時,消息或許會被返回給生產者并被丟棄。或者,如果我們為消息設置了有效期,延期后消息會被放入一個所謂的死信隊列中。此時,消息生產者可以選擇配置死信隊列參數來處理這些特殊情況。
一般來講呢,死信隊列都是一些過期的或者不需要處理的消息,我們這邊其實就是故意利用了消息過期之后進入死信隊列這個特性來處理延遲任務,為消息設置需要延遲的時間的等長有效期,等消息過期之后從死信隊列里面拿出消息處理。
一般來說以下情況會導致消息進入死信隊列:
1.消息被拒絕 basic.reject/basic.nack 并且設置 requeue 為 false(不重回隊列)的時候,消息就會進入死信隊列。
2. 消息隊列 TTL 過期或者消息有效期過期。
3. 隊列達到最大的長度,并且我們沒有設置自動拒絕消息的時候,隊首的消息就會進入死信隊列。
總結
以上是生活随笔為你收集整理的RabbitMQ 入门系列(2)— 生产者、消费者、信道、代理、队列、交换器、路由键、绑定、交换器的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: RabbitMQ 入门系列(1)— Ub
- 下一篇: RabbitMQ 入门系列(3)— 生产