RabbitMQ学习之消息可靠性及特性
轉載自?https://blog.csdn.net/zhu_tianwei/article/details/53971296
下面主要從隊列、消息發送、消息接收方面了解消息傳遞過的一些可靠性處理。?
1、隊列?
消費者是無法訂閱或者獲取不存在的MessageQueue中信息。消息被Exchange接受以后,如果沒有匹配的Queue,則會被丟棄。?
聲明一個隊列?
channel.queueDeclare(queue, durable, exclusive, autoDelete, arguments)?
durable:聲明隊列持久化?
exclusive:排他隊列,如果一個隊列被聲明為排他隊列,該隊列僅對首次聲明它的連接可見,并在連接斷開時自動刪除。這里需要注意三點:其一,排他隊列是基于連接可見的,同一連接的不同信道是可以同時訪問同一個連接創建的排他隊列的。其二,“首次”,如果一個連接已經聲明了一個排他隊列,其他連接是不允許建立同名的排他隊列的,這個與普通隊列不同。其三,即使該隊列是持久化的,一旦連接關閉或者客戶端退出,該排他隊列都會被自動刪除的。這種隊列適用于只限于一個客戶端發送讀取消息的應用場景。?
autoDelete:自動刪除,如果該隊列沒有任何訂閱的消費者的話,該隊列會被自動刪除。這種隊列適用于臨時隊列。?
其他選項,channel.queueDeclarePassive:例如如果用戶僅僅想查詢某一個隊列是否已存在,如果不存在,不想建立該隊列,仍然可以調用queue.declare,只不過需要將參數passive設為true,傳給queue.declare,如果該隊列已存在,則會返回true;如果不存在,則會返回Error,但是不會創建新的隊列。?
2、發送消息?
1.發送消息設置?
channel.basicPublish(exchange, routingKey, mandatory, immediate, basicProperties, body);?
basicProperties:通過參數實現消息持久化,MessageProperties.PERSISTENT_TEXT_PLAIN
mandatory:當mandatory標志位設置為true時,如果exchange根據自身類型和消息routeKey無法找到一個符合條件的queue,那么會調用basic.return方法將消息返還給生產者;當mandatory設為false時,出現上述情形broker會直接將消息扔掉。?
immediate:當immediate標志位設置為true時,如果exchange在將消息route到queue(s)時發現對應的queue上沒有消費者,那么這條消息不會放入隊列中。當與消息routeKey關聯的所有queue(一個或多個)都沒有消費者時,該消息會通過basic.return方法返還給生產者。?
2.事務機制?
對事務的支持是AMQP協議的一個重要特性。假設當生產者將一個持久化消息發送給服務器時,因為consume命令本身沒有任何Response返回,所以即使服務器崩潰,沒有持久化該消息,生產者也無法獲知該消息已經丟失。如果此時使用事務,即通過txSelect()開啟一個事務,然后發送消息給服務器,然后通過txCommit()提交該事務,即可以保證,如果txCommit()提交了,則該消息一定會持久化,如果txCommit()還未提交即服務器崩潰,則該消息不會服務器就收。當然Rabbit MQ也提供了txRollback()命令用于回滾某一個事務。
3.Confirm機制?
事務機制會帶來大量的多余開銷,并會導致吞吐量下降 250% 。為了補救事務帶來的問題,引入了 confirmation 機制(即 Publisher Confirm)。如果設置channel為confirm狀態,則通過該channel發送的消息都會被分配一個唯一的ID,然后一旦該消息被正確的路由到匹配的隊列中后,服務器會返回給生產者一個Confirm,該Confirm包含該消息的ID,這樣生產者就會知道該消息已被正確分發。對于持久化消息,只有該消息被持久化后,才會返回Confirm。Confirm機制的最大優點在于異步,生產者在發送消息以后,即可繼續執行其他任務。而服務器返回Confirm后,會觸發生產者的回調函數,生產者在回調函數中處理Confirm信息。如果消息服務器發生異常,導致該消息丟失,會返回給生產者一個nack,表示消息已經丟失,這樣生產者就可以通過重發消息,保證消息不丟失。Confirm機制在性能上要比事務優越很多。但是Confirm機制,無法進行回滾,就是一旦服務器崩潰,生產者無法得到Confirm信息,生產者其實本身也不知道該消息吃否已經被持久化,只有繼續重發來保證消息不丟失,但是如果原先已經持久化的消息,并不會被回滾,這樣隊列中就會存在兩條相同的消息,系統需要支持去重。可以mandatory配合實現消息的發送可靠性。
3、消息接收?
1.autoAck?
為了確保消息一定被消費者處理,rabbitMQ提供了消息確認功能,就是在消費者處理完任務之后,就給服務器一個回饋,服務器就會將該消息刪除,如果消費者超時不回饋,那么服務器將就將該消息重新發送給其他消費者。默認是開啟的,在消費者端通過下面的方式開啟消息確認, 首先將autoAck自動確認關閉,等我們的任務執行完成之后,手動的去確認.
2.公平調度?
讓每個消費者在同一時刻會分配一個任務。 通過channel.basicQos(prefetchCount);可以設置。?
3.exclusive?
和queue一樣,設置了true,只有第一個啟動的消費者可用。?
channel.basicConsume(queue, autoAck, consumerTag, noLocal, exclusive, arguments, consumer)
總結
以上是生活随笔為你收集整理的RabbitMQ学习之消息可靠性及特性的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: redis持久化存储AOF与RDB
- 下一篇: redis持久化策略梳理及主从环境下的策