kafkaspot在ack机制下如何保证内存不溢
storm框架中的kafkaspout類實現(xiàn)的是BaseRichSpout,它里面已經(jīng)重寫了fail和ack方法,所以我們的bolt必須實現(xiàn)ack機制,就可以保證消息的重新發(fā)送;如果不實現(xiàn)ack機制,那么kafkaspout就無法得到消息的處理響應(yīng),就會在超時以后再次發(fā)送消息,導(dǎo)致消息的重復(fù)發(fā)送。
?
但是回想一下我們自己寫一個spout類實現(xiàn)BaseRichSpout并讓他具備消息重發(fā),那么我們是會在我們的spout類里面定義一個map集合,并以msgId作為key。
public class MySpout extends BaseRichSpout {private static final long serialVersionUID = 5028304756439810609L;// key:messageId,Dataprivate HashMap<String, String> waitAck = new HashMap<String, String>();private SpoutOutputCollector collector;public void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("sentence"));}public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {this.collector = collector;}public void nextTuple() {String sentence = "the cow jumped over the moon";String messageId = UUID.randomUUID().toString().replaceAll("-", "");waitAck.put(messageId, sentence);//指定messageId,開啟ackfail機制collector.emit(new Values(sentence), messageId);}@Overridepublic void ack(Object msgId) {System.out.println("消息處理成功:" + msgId);System.out.println("刪除緩存中的數(shù)據(jù)...");waitAck.remove(msgId);}@Overridepublic void fail(Object msgId) {System.out.println("消息處理失敗:" + msgId);System.out.println("重新發(fā)送失敗的信息...");//重發(fā)如果不開啟ackfail機制,那么spout的map對象中的該數(shù)據(jù)不會被刪除的,而且下游collector.emit(new Values(waitAck.get(msgId)),msgId);} }那么kafkaspout會不會也是這樣還保存這已發(fā)送未收到bolt響應(yīng)的消息呢?如果這樣,如果消息處理不斷失敗,不斷重發(fā),消息不斷積累在kafkaspout節(jié)點上,kafkaspout端會不就會出現(xiàn)內(nèi)存溢出?
?
其實并沒有,回想kafka的原理,Kafka會為每一個consumergroup保留一些metadata信息–當(dāng)前消費的消息的position,也即offset。這個offset由consumer控制。正常情況下consumer會在消費完一條消息后線性增加這個offset。當(dāng)然,consumer也可將offset設(shè)成一個較小的值,重新消費一些消息。也就是說,kafkaspot在消費kafka的數(shù)據(jù)是,通過offset讀取到消息并發(fā)送給bolt后,kafkaspot只是保存者當(dāng)前的offset值。
當(dāng)失敗或成功根據(jù)msgId查詢offset值,然后再去kafka消費該數(shù)據(jù)來確保消息的重新發(fā)送。
?
那么雖然offset數(shù)據(jù)小,但是當(dāng)offset的數(shù)據(jù)量上去了還是會內(nèi)存溢出的?
其實并沒有,kafkaspout發(fā)現(xiàn)緩存的數(shù)據(jù)超過限制了,會把某端的數(shù)據(jù)清理掉的。
kafkaspot中發(fā)送數(shù)據(jù)的代碼
collector.emit(tup, new KafkaMessageId(_partition, toEmit.offset));可以看到msgID里面包裝了offset參數(shù)。
它不緩存已經(jīng)發(fā)送出去的數(shù)據(jù)信息。
?
當(dāng)他接收到來至bolt的響應(yīng)后,會從接收到的msgId中得到offset。以下是從源碼中折取的關(guān)鍵代碼:
public void ack(Object msgId) {KafkaMessageId id = (KafkaMessageId) msgId;PartitionManager m = _coordinator.getManager(id.partition);if (m != null) {m.ack(id.offset);}}m.ack(id.offset);public void ack(Long offset) {_pending.remove(offset);//處理成功移除offsetnumberAcked++;}public void fail(Object msgId) {KafkaMessageId id = (KafkaMessageId) msgId;PartitionManager m = _coordinator.getManager(id.partition);if (m != null) {m.fail(id.offset);}}m.fail(id.offset);public void fail(Long offset) {failed.add(offset);//處理失敗添加offsetnumberFailed++;}SortedSet<Long> _pending = new TreeSet<Long>();SortedSet<Long> failed = new TreeSet<Long>();關(guān)于kafkaspot的源碼解析大家可以看這邊博客:http://www.cnblogs.com/cruze/p/4241181.html
源碼解析中涉及了很多kafka的概念,所以僅僅理解kafka的概念想完全理解kafkaspot源碼是很難的,如果不理解kafka概念,那么就只需要在理解storm的ack機制上明白kafkaspot做了上面的兩件事就可以了。
轉(zhuǎn)發(fā):http://www.cnblogs.com/intsmaze/p/5947078.html
總結(jié)
以上是生活随笔為你收集整理的kafkaspot在ack机制下如何保证内存不溢的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 游戏开发入行大师攻略
- 下一篇: WPF 使用自定义的TTF字体