rocketmq怎么保证数据不会重复_RocketMQ保证信息有序性和防止重复
分布式開(kāi)放消息系統(tǒng)(RocketMQ)的原理與實(shí)踐
分布式消息系統(tǒng)做為實(shí)現(xiàn)分布式系統(tǒng)可擴(kuò)展、可伸縮性的關(guān)鍵組件,須要具備高吞吐量、高可用等特色。而談到消息系統(tǒng)的設(shè)計(jì),就回避不了兩個(gè)問(wèn)題:java
消息的順序問(wèn)題
消息的重復(fù)問(wèn)題
RocketMQ做為阿里開(kāi)源的一款高性能、高吞吐量的消息中間件,它是怎樣來(lái)解決這兩個(gè)問(wèn)題的?RocketMQ 有哪些關(guān)鍵特性?其實(shí)現(xiàn)原理是怎樣的?web
關(guān)鍵特性以及其實(shí)現(xiàn)原理
1、順序消息
消息有序指的是一類(lèi)消息消費(fèi)時(shí),能按照發(fā)送的順序來(lái)消費(fèi)。例如:一個(gè)訂單產(chǎn)生了 3 條消息,分別是訂單建立、訂單付款、訂單完成。消費(fèi)時(shí),要按照這個(gè)順序消費(fèi)才有意義。但同時(shí)訂單之間又是能夠并行消費(fèi)的。算法
假如生產(chǎn)者產(chǎn)生了2條消息:M一、M2,要保證這兩條消息的順序,應(yīng)該怎樣作?你腦中想到的多是這樣:服務(wù)器
你可能會(huì)采用這種方式保證消息順序網(wǎng)絡(luò)
M1發(fā)送到S1后,M2發(fā)送到S2,若是要保證M1先于M2被消費(fèi),那么須要M1到達(dá)消費(fèi)端后,通知S2,而后S2再將M2發(fā)送到消費(fèi)端。負(fù)載均衡
這個(gè)模型存在的問(wèn)題是,若是M1和M2分別發(fā)送到兩臺(tái)Server上,就不能保證M1先達(dá)到,也就不能保證M1被先消費(fèi),那么就須要在MQ Server集群維護(hù)消息的順序。那么如何解決?一種簡(jiǎn)單的方式就是將M一、M2發(fā)送到同一個(gè)Server上:分布式
保證消息順序,你改進(jìn)后的方法ide
這樣能夠保證M1先于M2到達(dá)MQServer(客戶(hù)端等待M1成功后再發(fā)送M2),根據(jù)先達(dá)到先被消費(fèi)的原則,M1會(huì)先于M2被消費(fèi),這樣就保證了消息的順序。svg
這個(gè)模型,理論上能夠保證消息的順序,但在實(shí)際運(yùn)用中你應(yīng)該會(huì)遇到下面的問(wèn)題:性能
網(wǎng)絡(luò)延遲問(wèn)題
只要將消息從一臺(tái)服務(wù)器發(fā)往另外一臺(tái)服務(wù)器,就會(huì)存在網(wǎng)絡(luò)延遲問(wèn)題。如上圖所示,若是發(fā)送M1耗時(shí)大于發(fā)送M2的耗時(shí),那么M2就先被消費(fèi),仍然不能保證消息的順序。即便M1和M2同時(shí)到達(dá)消費(fèi)端,因?yàn)椴磺宄M(fèi)端1和消費(fèi)端2的負(fù)載狀況,仍然有可能出現(xiàn)M2先于M1被消費(fèi)。如何解決這個(gè)問(wèn)題?將M1和M2發(fā)往同一個(gè)消費(fèi)者便可,且發(fā)送M1后,須要消費(fèi)端響應(yīng)成功后才能發(fā)送M2。
但又會(huì)引入另一個(gè)問(wèn)題,若是發(fā)送M1后,消費(fèi)端1沒(méi)有響應(yīng),那是繼續(xù)發(fā)送M2呢,仍是從新發(fā)送M1?通常為了保證消息必定被消費(fèi),確定會(huì)選擇重發(fā)M1到另一個(gè)消費(fèi)端2,就以下圖所示。
保證消息順序的正確姿式
這樣的模型就嚴(yán)格保證消息的順序,細(xì)心的你仍然會(huì)發(fā)現(xiàn)問(wèn)題,消費(fèi)端1沒(méi)有響應(yīng)Server時(shí)有兩種狀況,一種是M1確實(shí)沒(méi)有到達(dá),另一種狀況是消費(fèi)端1已經(jīng)響應(yīng),可是Server端沒(méi)有收到。若是是第二種狀況,重發(fā)M1,就會(huì)形成M1被重復(fù)消費(fèi)。也就是咱們后面要說(shuō)的第二個(gè)問(wèn)題,消息重復(fù)問(wèn)題。
回過(guò)頭來(lái)看消息順序問(wèn)題,嚴(yán)格的順序消息很是容易理解,并且處理問(wèn)題也比較容易,要實(shí)現(xiàn)嚴(yán)格的順序消息,簡(jiǎn)單且可行的辦法就是:
保證生產(chǎn)者 - MQServer - 消費(fèi)者是一對(duì)一對(duì)一的關(guān)系
可是這樣設(shè)計(jì),并行度就成為了消息系統(tǒng)的瓶頸(吞吐量不夠),也會(huì)致使更多的異常處理,好比:只要消費(fèi)端出現(xiàn)問(wèn)題,就會(huì)致使整個(gè)處理流程阻塞,咱們不得不花費(fèi)更多的精力來(lái)解決阻塞的問(wèn)題。
但咱們的最終目標(biāo)是要集群的高容錯(cuò)性和高吞吐量。這彷佛是一對(duì)不可調(diào)和的矛盾,那么阿里是如何解決的?
世界上解決一個(gè)計(jì)算機(jī)問(wèn)題最簡(jiǎn)單的方法:“剛好”不須要解決它!
有些問(wèn)題,看起來(lái)很重要,但實(shí)際上咱們能夠經(jīng)過(guò)合理的設(shè)計(jì)或者將問(wèn)題分解來(lái)規(guī)避。若是硬要把時(shí)間花在解決它們身上,其實(shí)是浪費(fèi)的,效率低下的。從這個(gè)角度來(lái)看消息的順序問(wèn)題,咱們能夠得出兩個(gè)結(jié)論:
一、不關(guān)注亂序的應(yīng)用實(shí)際大量存在
二、隊(duì)列無(wú)序并不意味著消息無(wú)序
最后咱們從源碼角度分析RocketMQ怎么實(shí)現(xiàn)發(fā)送順序消息。
通常消息是經(jīng)過(guò)輪詢(xún)?nèi)筷?duì)列來(lái)發(fā)送的(負(fù)載均衡策略),順序消息能夠根據(jù)業(yè)務(wù),好比說(shuō)訂單號(hào)相同的消息發(fā)送到同一個(gè)隊(duì)列。下面的示例中,OrderId相同的消息,會(huì)發(fā)送到同一個(gè)隊(duì)列:
// RocketMQ默認(rèn)提供了兩種MessageQueueSelector實(shí)現(xiàn):隨機(jī)/Hash
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List mqs, Message msg, Object arg) {
Integer id = (Integer) arg;
int index = id % mqs.size();
return mqs.get(index);
}
}, orderId);
在獲取到路由信息之后,會(huì)根據(jù)MessageQueueSelector實(shí)現(xiàn)的算法來(lái)選擇一個(gè)隊(duì)列,同一個(gè)OrderId獲取到的隊(duì)列是同一個(gè)隊(duì)列。
private SendResult send() {
// 獲取topic路由信息
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
if (topicPublishInfo != null && topicPublishInfo.ok()) {
MessageQueue mq = null;
// 根據(jù)咱們的算法,選擇一個(gè)發(fā)送隊(duì)列
// 這里的arg = orderId
mq = selector.select(topicPublishInfo.getMessageQueueList(), msg, arg);
if (mq != null) {
return this.sendKernelImpl(msg, mq, communicationMode, sendCallback, timeout);
}
}
}
2、消息重復(fù)
上面在解決消息順序問(wèn)題時(shí),引入了一個(gè)新的問(wèn)題,就是消息重復(fù)。那么RocketMQ是怎樣解決消息重復(fù)的問(wèn)題呢?仍是“剛好”不解決。
形成消息的重復(fù)的根本緣由是:網(wǎng)絡(luò)不可達(dá)。只要經(jīng)過(guò)網(wǎng)絡(luò)交換數(shù)據(jù),就沒(méi)法避免這個(gè)問(wèn)題。因此解決這個(gè)問(wèn)題的辦法就是不解決,轉(zhuǎn)而繞過(guò)這個(gè)問(wèn)題。那么問(wèn)題就變成了:若是消費(fèi)端收到兩條同樣的消息,應(yīng)該怎樣處理?
一、消費(fèi)端處理消息的業(yè)務(wù)邏輯保持冪等性
二、保證每條消息都有惟一編號(hào)且保證消息處理成功與去重表的日志同時(shí)出現(xiàn)
第1條很好理解,只要保持冪等性,無(wú)論來(lái)多少條重復(fù)消息,最后處理的結(jié)果都同樣。第2條原理就是利用一張日志表來(lái)記錄已經(jīng)處理成功的消息的ID,若是新到的消息ID已經(jīng)在日志表中,那么就再也不處理這條消息。
咱們能夠看到第1條的解決方式,很明顯應(yīng)該在消費(fèi)端實(shí)現(xiàn),不屬于消息系統(tǒng)要實(shí)現(xiàn)的功能。第2條能夠消息系統(tǒng)實(shí)現(xiàn),也能夠業(yè)務(wù)端實(shí)現(xiàn)。正常狀況下出現(xiàn)重復(fù)消息的幾率不必定大,且由消息系統(tǒng)實(shí)現(xiàn)的話(huà),確定會(huì)對(duì)消息系統(tǒng)的吞吐量和高可用有影響,因此最好仍是由業(yè)務(wù)端本身處理消息重復(fù)的問(wèn)題,這也是RocketMQ不解決消息重復(fù)的問(wèn)題的緣由。
RocketMQ不保證消息不重復(fù),若是你的業(yè)務(wù)須要保證嚴(yán)格的不重復(fù)消息,須要你本身在業(yè)務(wù)端去重。
總結(jié)
以上是生活随笔為你收集整理的rocketmq怎么保证数据不会重复_RocketMQ保证信息有序性和防止重复的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 设树采用孩子兄弟表示法存放.用类c语言设
- 下一篇: python编写加密程序_用Python