MQ保证消息的可靠性传输
關注微信公眾號“蝦米聊吧”,后續持續放送“技術架構和資料”干貨!!
前景概要:
我們在使用mq的時候肯定是希望數據不能多也不能少的,不能多即需要考慮mq的重復消費(冪等性)問題。不能少即不能丟失數據。
很明顯如果如果你的mq中存放的是非常核心、重要的數據,比如訂單數據、積分流水等,
這些數據是必須不能丟的,一旦丟失則會造成數據的一致性問題,可能會給公司帶來極大損失。
?
丟失數據分析:
丟失數據一般來說分為2種情況,一種是數據已經發送到了mq,但是mq自身把數據弄丟了,第二種是我們在消費的時候把數據弄丟了,接下來簡單從rabbitmq和kafka來分析一下。
tips
根據之前的使用場景來看,一般rabbitmq都會存放一些比較核心業務的數據,理論上是不能弄丟的;
kafka一般用于大數據領域,承載較多的可能是一些日志數據。
?
?
(1)RabbitMq
?
1)生產者丟失數據
?
關鍵點:rabbitmq開啟confirm確認機制后,rabbitmq通過異步回調的形式告訴生產者是否已接收到。
?
生產者將數據發送到rabbitmq的時候,可能數據就在半路給丟了,因為網絡問題或者機宕機問題等都有可能。
一般來說,要確保發送給rabbitmq的消息不能丟失,可以開啟confirm模式,在生產者端設置開啟confirm模式之后,每次寫入mq的消息都會分配一個唯一的id,如果寫入了rabbitmq中,rabbitmq則會回傳一個ack確認消息,告訴你說這個消息已經成功寫入了。如果rabbitmq沒能處理這個消息,會回調你一個nack接口,告訴你這個消息接收失敗,你可以重試。而且你可以結合這個機制自己在內存里維護每個消息id的狀態,如果超過一定時間還沒接收到這個消息的回調,那么你可以重發。
?
2)rabbitmq自身丟失了數據
關鍵點:開啟持久化機制
這個就是說rabbitmq自己弄丟了數據,所以我們必須開啟rabbitmq的持久化機制,當消息寫入之后會持久化到磁盤,即使是rabbitmq自己掛了,恢復之后會自動讀取之前存儲的數據,一般數據不會丟。除非極其罕見的是,rabbitmq還沒持久化,自己就掛了,可能導致少量數據會丟失的,但是這個概率較小。
?
設置持久化有兩個步驟:
第一個是創建queue的時候將其設置為持久化的,這樣就可以保證rabbitmq持久化queue的元數據,但是不會持久化queue里的數據;
第二個是發送消息的時候將消息的deliveryMode設置為2,就是將消息設置為持久化,此時rabbitmq就會將消息持久化到磁盤上去。必須要同時設置這兩個持久化才行,rabbitmq即使掛了,當再次重啟的時候,也會從磁盤上恢復queue,恢復這個queue里的數據。
而且持久化可以跟生產者那邊的confirm機制配合起來,只有消息被持久化到磁盤之后,才會通知生產者ack了,所以哪怕是在持久化到磁盤之前,rabbitmq掛了,數據丟了,生產者收不到ack,你也是可以自己重發的。
但是即使開啟了rabbitmq持久化機制,也有一種可能仍然導致數據丟失,就是這個消息寫到了rabbitmq中,但是還沒來得及持久化到磁盤上,結果不巧,此時rabbitmq掛了,就會導致內存里的一點點數據會丟失。
?
3)消費端丟失了數據
?
關鍵點:關閉自動ack,開啟手動ack
?
這個很容易理解,主要是因為消費者那一端在消費的時候,剛消費到數據還沒完成處理過程,結果消費者服務的進程掛了,但是rabbitmq會認為你已經消費了,此時這數據就丟了。
這個時候就需要采用rabbitmq提供的ack機制,簡單來說,就是你需要關閉rabbitmq自動ack,可以通過一個api來調用就行,然后每次你自己代碼里確保處理完成的時候,然后程序里再執行ack。這樣的話,ack的主動權就掌握在我們自己手里了,即使數據還沒處理完,也不會執行ack,這樣的話rabbitmq就不會認為這條消息已經被消費了,所以消息是不會丟的。
?
(2)Kafka
?
1) 消費端弄丟了數據
關鍵點:關閉自動提交offset,手動提交offset
唯一可能導致消費者弄丟數據的情況,就是消費者消費到了這個消息,然后消費者那邊自動提交了offset,讓kafka以為你已經消費了這個消息,但其實你還沒處理完程序的邏輯就掛了,此時這條消息就丟了。
我們都知道kafka默認會自動提交offset,因此只要關閉自動提交offset,在處理完之后自己手動提交offset,就可以保證數據不會丟。但是此時可能會出現另外一個問題就是重復消費,比如說你剛處理完代碼邏輯,還沒來得及提交offset,結果程序就掛了,此時肯定會重復消費一次,因此這個需要自身在消費者那端保證冪等性。
?
2)kafka自己弄丟了數據
較常見的一個場景就是kafka某個broker宕機,然后重新選舉partition的leader時候,如果此時其他的follower分片剛好還有些數據沒有同步,結果這個時候leader掛了,然后選舉某個follower成leader之后,它就會少了一部分數據。
?
所以此時一般是要求起碼設置如下4個參數:
給這個topic設置replication.factor參數:這個值必須大于1,要求每個partition必須有至少2個副本;
在kafka服務端設置min.insync.replicas參數:這個值必須大于1,這個是要求一個leader至少感知到有至少一個follower還跟自己保持聯系,沒掉隊,這樣才能確保leader掛了還有一個follower吧;
在producer端設置acks=all:這個是要求每條數據,必須是寫入所有replica副本之后,才能認為是寫成功了;
在producer端設置retries=MAX(可以設置一個比較大的值,重試次數):這個是要求一旦寫入失敗,進行重試;
?
3)生產者會不會弄丟數據
?
如果按照上述的思路設置了ack=all,一定不會丟,要求是,你的leader接收到消息,所有的follower都同步到了消息之后,才認為本次寫成功了。如果沒滿足這個條件,生產者會自動不斷的重試,重試無限次。
| 關注微信公眾號“蝦米聊吧”,后續持續放送“技術架構和資料”干貨!! ? ? ? ? ? ? ? ? ? ? ?? ? ? ? ? ? ? ? ? ? ? ? ? ?掃碼關注獲取更多技術架構知識干貨喲~ |
?
總結
以上是生活随笔為你收集整理的MQ保证消息的可靠性传输的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Android如何实现语音播放与录音功能
- 下一篇: 合肥:置换新能源车将可获 5000 元内