RocketMq重试及消息不丢失机制
1、消息重試機制
由于MQ經(jīng)常處于復(fù)雜的分布式系統(tǒng)中,考慮網(wǎng)絡(luò)波動、服務(wù)宕機、程序異常因素,很有可能出現(xiàn)消息發(fā)送或者消費失敗的問題。因此,消息的重試就是所有MQ中間件必須考慮到的一個關(guān)鍵點。如果沒有消息重試,就可能產(chǎn)生消息丟失的問題,可能對系統(tǒng)產(chǎn)生很大的影響。所以,秉承寧可多發(fā)消息,也不可丟失消息的原則,大部分MQ都對消息重試提供了很好的支持。
RocketMQ為使用者封裝了消息重試的處理流程,無需開發(fā)人員手動處理。RocketMQ支持了生產(chǎn)端和消費端兩類重試機制。
1.1 生產(chǎn)端重試
生產(chǎn)端配置的有發(fā)送失敗重試次數(shù),默認(rèn)為2。使用了set方法對外進行暴露,producer客戶端可以改寫這個默認(rèn)值。
public DefaultMQProducer(String producerGroup, RPCHook rpcHook) {this.createTopicKey = "TBW102";this.defaultTopicQueueNums = 4;this.sendMsgTimeout = 3000;this.compressMsgBodyOverHowmuch = 4096;//發(fā)送失敗,重試次數(shù)this.retryTimesWhenSendFailed = 2;this.retryAnotherBrokerWhenNotStoreOK = false;this.maxMessageSize = 131072;this.unitMode = false;this.producerGroup = producerGroup;this.defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);}1.2 消費端重試
消費者消費消息后,需要給Broker返回消費狀態(tài)。以MessageListenerConcurrently監(jiān)聽器為例,Consumer消費完成后需要返回ConsumeConcurrentlyStatus并發(fā)消費狀態(tài)。查看源碼,ConsumeConcurrentlyStatus是一個枚舉,共有兩種狀態(tài):
public enum ConsumeConcurrentlyStatus {//消費成功ConsumeConcurrentlyStatus,//消費失敗,一段時間后重試RECONSUME_LATER; }RECONSUME_LATER代表因為某種原因,消費失敗,稍后再試。后續(xù)會再次消費
官方文檔介紹如下:
RocketMQ中的消息無法無限次重新消費,當(dāng)然了,手動修改重試次數(shù)是可以的,不介入的話不行。當(dāng)重試次數(shù)超過所有延遲級別之后。消息會進入死信,死信Topic的命名為:%DLQ% + Consumer組名。
進入死信之后的消息肯定不會再投遞了,不過可以通過接口去查詢當(dāng)前RocketMQ中死信隊列的消息。如果在上層實現(xiàn)自有命令,那么可以將消息從死信中移出并重新投遞。
死信消息具有以下特性:
- 不會再被消費者正常消費。
- 有效期與正常消息相同,均為 3 天,3 天后會被自動刪除。因此,請在死信消息產(chǎn)生后的 3 天內(nèi)及時處理。
2、保證消息不丟失
分別從Producer發(fā)送機制、Broker的持久化機制,以及消費者的offSet機制來最大程度保證消息不易丟失
- 從Producer的視角來看:如果消息未能正確的存儲在MQ中,或者消費者未能正確的消費到這條消息,都是消息丟失。
- 從Broker的視角來看:如果消息已經(jīng)存在Broker里面了,如何保證不會丟失呢(宕機、磁盤崩潰)
- 從Consumer的視角來看:如果消息已經(jīng)完成持久化了,但是Consumer取了,但是未消費成功且沒有反饋,就是消息丟失
從Producer分析:如何確保消息正確的發(fā)送到了Broker?
- 默認(rèn)情況下,可以通過同步的方式阻塞式的發(fā)送,check SendStatus,狀態(tài)是OK,表示消息一定成功的投遞到了Broker,狀態(tài)超時或者失敗,則會觸發(fā)默認(rèn)的2次重試。此方法的發(fā)送結(jié)果,可能Broker存儲成功了,也可能沒成功
- 采取事務(wù)消息的投遞方式,并不能保證消息100%投遞成功到了Broker,但是如果消息發(fā)送Ack失敗的話,此消息會存儲在CommitLog當(dāng)中,但是對ConsumerQueue是不可見的??梢栽谌罩局胁榭吹竭@條異常的消息,嚴(yán)格意義上來講,也并沒有完全丟失
- RocketMQ支持 日志的索引,如果一條消息發(fā)送之后超時,也可以通過查詢?nèi)罩镜腁PI,來check是否在Broker存儲成功
從Broker分析:如果確保接收到的消息不會丟失?
- 消息支持持久化到Commitlog里面,即使宕機后重啟,未消費的消息也是可以加載出來的
- Broker自身支持同步刷盤、異步刷盤的策略,可以保證接收到的消息一定存儲在本地的內(nèi)存中
- Broker集群支持 1主N從的策略,支持同步復(fù)制和異步復(fù)制的方式,同步復(fù)制可以保證即使Master 磁盤崩潰,消息仍然不會丟失
從Cunmser分析:如何確保拉取到的消息被成功消費?
- 消費者可以根據(jù)自身的策略批量Pull消息
- Consumer自身維護一個持久化的offset(對應(yīng)MessageQueue里面的min offset),標(biāo)記已經(jīng)成功消費或者已經(jīng)成功發(fā)回到broker的消息下標(biāo)
- 如果Consumer消費失敗,那么它會把這個消息發(fā)回給Broker,發(fā)回成功后,再更新自己的offset
- 如果Consumer消費失敗,發(fā)回給broker時,broker掛掉了,那么Consumer會定時重試這個操作
- 如果Consumer和broker一起掛了,消息也不會丟失,因為consumer 里面的offset是定時持久化的,重啟之后,繼續(xù)拉取offset之前的消息到本地
總結(jié)
以上是生活随笔為你收集整理的RocketMq重试及消息不丢失机制的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: FDG代谢增高是什么意思
- 下一篇: 布洛芬缓释胶囊退烧吗