Storm的ack机制在项目应用中的坑
正在學(xué)習(xí)storm的大兄弟們,我又來傳道授業(yè)解惑了,是不是覺得自己會(huì)用ack了。好吧,那就讓我開始啪啪打你們臉吧。
先說一下ACK機(jī)制:
為了保證數(shù)據(jù)能正確的被處理, 對(duì)于spout產(chǎn)生的每一個(gè)tuple, storm都會(huì)進(jìn)行跟蹤。
這里面涉及到ack/fail的處理,如果一個(gè)tuple處理成功是指這個(gè)Tuple以及這個(gè)Tuple產(chǎn)生的所有Tuple都被成功處理, 會(huì)調(diào)用spout的ack方法;
如果失敗是指這個(gè)Tuple或這個(gè)Tuple產(chǎn)生的所有Tuple中的某一個(gè)tuple處理失敗, 則會(huì)調(diào)用spout的fail方法;
在處理tuple的每一個(gè)bolt都會(huì)通過OutputCollector來告知storm, 當(dāng)前bolt處理是否成功。
另外需要注意的,當(dāng)spout觸發(fā)fail動(dòng)作時(shí),不會(huì)自動(dòng)重發(fā)失敗的tuple,需要我們?cè)趕pout中重新獲取發(fā)送失敗數(shù)據(jù),手動(dòng)重新再發(fā)送一次。
Ack原理
Storm中有個(gè)特殊的task名叫acker,他們負(fù)責(zé)跟蹤spout發(fā)出的每一個(gè)Tuple的Tuple樹(因?yàn)橐粋€(gè)tuple通過spout發(fā)出了,經(jīng)過每一個(gè)bolt處理后,會(huì)生成一個(gè)新的tuple發(fā)送出去)。當(dāng)acker(框架自啟動(dòng)的task)發(fā)現(xiàn)一個(gè)Tuple樹已經(jīng)處理完成了,它會(huì)發(fā)送一個(gè)消息給產(chǎn)生這個(gè)Tuple的那個(gè)task。
Acker的跟蹤算法是Storm的主要突破之一,對(duì)任意大的一個(gè)Tuple樹,它只需要恒定的20字節(jié)就可以進(jìn)行跟蹤。
Acker跟蹤算法的原理:acker對(duì)于每個(gè)spout-tuple保存一個(gè)ack-val的校驗(yàn)值,它的初始值是0,然后每發(fā)射一個(gè)Tuple或Ack一個(gè)Tuple時(shí),這個(gè)Tuple的id就要跟這個(gè)校驗(yàn)值異或一下,并且把得到的值更新為ack-val的新值。那么假設(shè)每個(gè)發(fā)射出去的Tuple都被ack了,那么最后ack-val的值就一定是0。Acker就根據(jù)ack-val是否為0來判斷是否完全處理,如果為0則認(rèn)為已完全處理。
要實(shí)現(xiàn)ack機(jī)制:
1,spout發(fā)射tuple的時(shí)候指定messageId
2,spout要重寫B(tài)aseRichSpout的fail和ack方法
3,spout對(duì)發(fā)射的tuple進(jìn)行緩存(否則spout的fail方法收到acker發(fā)來的messsageId,spout也無法獲取到發(fā)送失敗的數(shù)據(jù)進(jìn)行重發(fā)),看看系統(tǒng)提供的接口,只有msgId這個(gè)參數(shù),這里的設(shè)計(jì)不合理,其實(shí)在系統(tǒng)里是有cache整個(gè)msg的,只給用戶一個(gè)messageid,用戶如何取得原來的msg貌似需要自己cache,然后用這個(gè)msgId去查詢,太坑爹了
3,spout根據(jù)messageId對(duì)于ack的tuple則從緩存隊(duì)列中刪除,對(duì)于fail的tuple可以選擇重發(fā)。
4,設(shè)置acker數(shù)至少大于0;Config.setNumAckers(conf, ackerParal);
Storm的Bolt有BsicBolt和RichBolt:
在BasicBolt中,BasicOutputCollector在emit數(shù)據(jù)的時(shí)候,會(huì)自動(dòng)和輸入的tuple相關(guān)聯(lián),而在execute方法結(jié)束的時(shí)候那個(gè)輸入tuple會(huì)被自動(dòng)ack。
使用RichBolt需要在emit數(shù)據(jù)的時(shí)候,顯示指定該數(shù)據(jù)的源tuple要加上第二個(gè)參數(shù)anchor tuple,以保持tracker鏈路,即collector.emit(oldTuple, newTuple);并且需要在execute執(zhí)行成功后調(diào)用OutputCollector.ack(tuple), 當(dāng)失敗處理時(shí),執(zhí)行OutputCollector.fail(tuple);
由一個(gè)tuple產(chǎn)生一個(gè)新的tuple稱為:anchoring,你發(fā)射一個(gè)tuple的同時(shí)也就完成了一次anchoring。
ack機(jī)制即,spout發(fā)送的每一條消息,在規(guī)定的時(shí)間內(nèi),spout收到Acker的ack響應(yīng),即認(rèn)為該tuple 被后續(xù)bolt成功處理;在規(guī)定的時(shí)間內(nèi)(默認(rèn)是30秒),沒有收到Acker的ack響應(yīng)tuple,就觸發(fā)fail動(dòng)作,即認(rèn)為該tuple處理失敗,timeout時(shí)間可以通過Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS來設(shè)定。或者收到Acker發(fā)送的fail響應(yīng)tuple,也認(rèn)為失敗,觸發(fā)fail動(dòng)作
注意,我開始以為如果繼承BaseBasicBolt那么程序拋出異常,也會(huì)讓spout進(jìn)行重發(fā),但是我錯(cuò)了,程序直接異常停止了
這里我以分布式程序入門案例worldcount為例子吧。請(qǐng)看下面大屏幕:沒有錯(cuò)我就是那個(gè)你們走在路上經(jīng)常聽見的名字劉洋。
這里spout1-1task發(fā)送句子"i am liu yang"給bolt2-2task進(jìn)行處理,該task把句子切分為單詞,根據(jù)字段分發(fā)到下一個(gè)bolt中,bolt2-2,bolt4-4,bolt5-5對(duì)每一個(gè)單詞添加一個(gè)后綴1后再發(fā)送給下一個(gè)bolt進(jìn)行存儲(chǔ)到數(shù)據(jù)庫(kù)的操作,這個(gè)時(shí)候bolt7-7task在存儲(chǔ)數(shù)據(jù)到數(shù)據(jù)庫(kù)時(shí)失敗,向spout發(fā)送fail響應(yīng),這個(gè)時(shí)候spout收到消息就會(huì)再次發(fā)送的該數(shù)據(jù)。
好,那么我思考一個(gè)問題:spout如何保證再次發(fā)送的數(shù)據(jù)就是之前失敗的數(shù)據(jù),所以在spout實(shí)例中,絕對(duì)要定義一個(gè)map緩存,緩存發(fā)出去的每一條數(shù)據(jù),key當(dāng)然就是messageId,當(dāng)spout實(shí)例收到所有bolt的響應(yīng)后如果是ack,就會(huì)調(diào)用我們重寫的ack方法,在這個(gè)方法里面我們就要根據(jù)messageId刪除這個(gè)key-value,如果spout實(shí)例收到所有bolt響應(yīng)后,發(fā)現(xiàn)是faile,則會(huì)調(diào)用我們重寫的fail方法,根據(jù)messageId查詢到對(duì)應(yīng)的數(shù)據(jù)再次發(fā)送該數(shù)據(jù)出去。
spout代碼如下
public class MySpout extends BaseRichSpout {private static final long serialVersionUID = 5028304756439810609L;// key:messageId,Dataprivate HashMap<String, String> waitAck = new HashMap<String, String>();private SpoutOutputCollector collector;public void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("sentence"));}public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {this.collector = collector;}public void nextTuple() {String sentence = "i am liu yang";String messageId = UUID.randomUUID().toString().replaceAll("-", "");waitAck.put(messageId, sentence);//指定messageId,開啟ackfail機(jī)制collector.emit(new Values(sentence), messageId);}@Overridepublic void ack(Object msgId) {System.out.println("消息處理成功:" + msgId);System.out.println("刪除緩存中的數(shù)據(jù)...");waitAck.remove(msgId);}@Overridepublic void fail(Object msgId) {System.out.println("消息處理失敗:" + msgId);System.out.println("重新發(fā)送失敗的信息...");//重發(fā)如果不開啟ackfail機(jī)制,那么spout的map對(duì)象中的該數(shù)據(jù)不會(huì)被刪除的。collector.emit(new Values(waitAck.get(msgId)),msgId);} }雖然在storm項(xiàng)目中我們的spout源通常來源kafka,而且我們使用storm提供的工具類KafkaSpout類,其實(shí)這個(gè)類里面就維護(hù)者<messageId,Tuple>對(duì)的集合。
Storm怎么處理重復(fù)的tuple?
因?yàn)镾torm要保證tuple的可靠處理,當(dāng)tuple處理失敗或者超時(shí)的時(shí)候,spout會(huì)fail并重新發(fā)送該tuple,那么就會(huì)有tuple重復(fù)計(jì)算的問題。這個(gè)問題是很難解決的,storm也沒有提供機(jī)制幫助你解決。一些可行的策略:
(1)不處理,這也算是種策略。因?yàn)閷?shí)時(shí)計(jì)算通常并不要求很高的精確度,后續(xù)的批處理計(jì)算會(huì)更正實(shí)時(shí)計(jì)算的誤差。
(2)使用第三方集中存儲(chǔ)來過濾,比如利用mysql,memcached或者redis根據(jù)邏輯主鍵來去重。
(3)使用bloom filter做過濾,簡(jiǎn)單高效。
問題一:你們有沒有想過如果某一個(gè)task節(jié)點(diǎn)處理的tuple一直失敗,消息一直重發(fā)會(huì)怎么樣?
我們都知道,spout作為消息的發(fā)送源,在沒有收到該tuple來至左右bolt的返回信息前,是不會(huì)刪除的,那么如果消息一直失敗,就會(huì)導(dǎo)致spout節(jié)點(diǎn)存儲(chǔ)的tuple數(shù)據(jù)越來越多,導(dǎo)致內(nèi)存溢出。
問題二:有沒有想過,如果該tuple的眾多子tuple中,某一個(gè)子tuple處理failed了,但是另外的子tuple仍然會(huì)繼續(xù)執(zhí)行,如果子tuple都是執(zhí)行數(shù)據(jù)存儲(chǔ)操作,那么就算整個(gè)消息失敗,那些生成的子tuple還是會(huì)成功執(zhí)行而不會(huì)回滾的。
這個(gè)時(shí)候storm的原生api是無法支持這種事務(wù)性操作,我們可以使用storm提供的高級(jí)api-trident來做到(具體如何我不清楚,目前沒有研究它,但是我可以它內(nèi)部一定是根據(jù)分布式協(xié)議比如兩階段提交協(xié)議等)。向這種業(yè)務(wù)中要保證事務(wù)性功能,我們完全可以根據(jù)我們自身的業(yè)務(wù)來做到,比如這里的入庫(kù)操作,我們先記錄該消息是否已經(jīng)入庫(kù)的狀態(tài),再入庫(kù)時(shí)查詢狀態(tài)來決定是否給予執(zhí)行。
問題三:tuple的追蹤并不一定要是從spout結(jié)點(diǎn)到最后一個(gè)bolt,只要是spout開始,可以在任意層次bolt停止追蹤做出應(yīng)答。
Acker task 組件來設(shè)置一個(gè)topology里面的acker的數(shù)量,默認(rèn)值是一,如果你的topoogy里面的tuple比較多的話,那么請(qǐng)把a(bǔ)cker的數(shù)量設(shè)置多一點(diǎn),效率會(huì)更高一點(diǎn)。
調(diào)整可靠性?
acker task是非常輕量級(jí)的,?所以一個(gè)topology里面不需要很多acker。你可以通過Strom UI(id: -1)來跟蹤它的性能。?如果它的吞吐量看起來不正常,那么你就需要多加點(diǎn)acker了。
如果可靠性對(duì)你來說不是那么重要?—?你不太在意在一些失敗的情況下?lián)p失一些數(shù)據(jù),?那么你可以通過不跟蹤這些tuple樹來獲取更好的性能。不去跟蹤消息的話會(huì)使得系統(tǒng)里面的消息數(shù)量減少一半,?因?yàn)閷?duì)于每一個(gè)tuple都要發(fā)送一個(gè)ack消息。并且它需要更少的id來保存下游的tuple,?減少帶寬占用。
有三種方法可以去掉可靠性。
第一是把Config.TOPOLOGY_ACKERS?設(shè)置成?0.?在這種情況下,?storm會(huì)在spout發(fā)射一個(gè)tuple之后馬上調(diào)用spout的ack方法。也就是說這個(gè)tuple樹不會(huì)被跟蹤。
第二個(gè)方法是在tuple層面去掉可靠性。?你可以在發(fā)射tuple的時(shí)候不指定messageid來達(dá)到不跟粽某個(gè)特定的spout tuple的目的。
最后一個(gè)方法是如果你對(duì)于一個(gè)tuple樹里面的某一部分到底成不成功不是很關(guān)心,那么可以在發(fā)射這些tuple的時(shí)候unanchor它們。?這樣這些tuple就不在tuple樹里面,?也就不會(huì)被跟蹤了。
可靠性配置
有三種方法可以去掉消息的可靠性:
將參數(shù)Config.TOPOLOGY_ACKERS設(shè)置為0,通過此方法,當(dāng)Spout發(fā)送一個(gè)消息的時(shí)候,它的ack方法將立刻被調(diào)用;
Spout發(fā)送一個(gè)消息時(shí),不指定此消息的messageID。當(dāng)需要關(guān)閉特定消息可靠性的時(shí)候,可以使用此方法;
最后,如果你不在意某個(gè)消息派生出來的子孫消息的可靠性,則此消息派生出來的子消息在發(fā)送時(shí)不要做錨定,即在emit方法中不指定輸入消息。因?yàn)檫@些子孫消息沒有被錨定在任何tuple tree中,因此他們的失敗不會(huì)引起任何spout重新發(fā)送消息。
如何關(guān)閉Ack機(jī)制
有2種途徑
spout發(fā)送數(shù)據(jù)是不帶上msgid
設(shè)置acker數(shù)等于0
值得注意的一點(diǎn)是Storm調(diào)用Ack或者fail的task始終是產(chǎn)生這個(gè)tuple的那個(gè)task,所以如果一個(gè)Spout,被分為很多個(gè)task來執(zhí)行,消息執(zhí)行的成功失敗與否始終會(huì)通知最開始發(fā)出tuple的那個(gè)task。
作為Storm的使用者,有兩件事情要做以更好的利用Storm的可靠性特征,首先你在生成一個(gè)tuple的時(shí)候要通知Storm,其次,完全處理一個(gè)tuple之后要通知Storm,這樣Storm就可以檢測(cè)到整個(gè)tuple樹有沒有完成處理,并且通知源Spout處理結(jié)果。
1 由于對(duì)應(yīng)的task掛掉了,一個(gè)tuple沒有被Ack:
Storm的超時(shí)機(jī)制在超時(shí)之后會(huì)把這個(gè)tuple標(biāo)記為失敗,從而可以重新處理。
2 Acker掛掉了: 在這種情況下,由這個(gè)Acker所跟蹤的所有spout tuple都會(huì)出現(xiàn)超時(shí),也會(huì)被重新的處理。
3 Spout 掛掉了:在這種情況下給Spout發(fā)送消息的消息源負(fù)責(zé)重新發(fā)送這些消息。
三個(gè)基本的機(jī)制,保證了Storm的完全分布式,可伸縮的并且高度容錯(cuò)的。
另外Ack機(jī)制還常用于限流作用:?為了避免spout發(fā)送數(shù)據(jù)太快,而bolt處理太慢,常常設(shè)置pending數(shù),當(dāng)spout有等于或超過pending數(shù)的tuple沒有收到ack或fail響應(yīng)時(shí),跳過執(zhí)行nextTuple, 從而限制spout發(fā)送數(shù)據(jù)。
通過conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, pending);設(shè)置spout pend數(shù)。
轉(zhuǎn)發(fā):https://www.cnblogs.com/intsmaze/p/5918087.html
總結(jié)
以上是生活随笔為你收集整理的Storm的ack机制在项目应用中的坑的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 通过设置关联菜单建立excel记账本
- 下一篇: itoa的用法