trident State应用指南
trident State應(yīng)用指南
@(STORM)[storm, 大數(shù)據(jù)]
- trident State應(yīng)用指南
- 一State基礎(chǔ)示例
- 1主類(lèi)
- 2Aggregator的用法
- 1Aggregator接口
- 2init方法
- 3aggregate方法
- 4complete方法
- 3state的用法
- 1拓?fù)涠x
- 2工廠類(lèi)NameSumStateFactory
- 3更新類(lèi)NameSumUpdater
- 4狀態(tài)類(lèi)NameSumState
- 4state應(yīng)用步驟總結(jié)
- 5state應(yīng)用的一些注意事項(xiàng)
- 6state與MapState的差異
- 二MapState
- 1persistentAggregate
- 2MapStates
- 3Demo
- 1創(chuàng)建一個(gè)實(shí)現(xiàn)IBackingMap的類(lèi)實(shí)現(xiàn)multiGet和multiPut方法
- 2創(chuàng)建實(shí)現(xiàn)StateFactory的類(lèi)
- 3定義Count函數(shù)
- 4在拓?fù)渲袑?xiě)入state或者查詢(xún)state
- 4關(guān)于MapState的總結(jié)
- 1基本步驟
- 2全流程邏輯
- 3復(fù)雜的情況
- 4其它思考
- 5MapState讀寫(xiě)mysql示例
- 1MysqlMapStateFactory
- 2MysqlMapStateBacking
- 三以HBaseMapState為例分析MapState代碼調(diào)用全過(guò)程
- 零概述 MapState被調(diào)用的全流程代碼
- 1調(diào)用過(guò)程
- TOTO按著這個(gè)流程把代碼重頭讀一遍先了解ITridentBatchBolt
- 2內(nèi)容概述
- 一如何使用MapState
- 二如何實(shí)現(xiàn)一個(gè)MapStateHBaseMapState源碼分析
- 1Option內(nèi)部類(lèi)
- 2Factory內(nèi)部類(lèi)
- 1構(gòu)造函數(shù)
- 2makeState方法
- 3構(gòu)造函數(shù)
- 4返回StateFactory的方法
- 5multiGet
- 6multiPut
- 7序列化器
- 三MapState框架
- TODO補(bǔ)充各個(gè)類(lèi)的關(guān)系圖參考P323
- 1build方法
- 2構(gòu)造方法
- 3beginCommit
- TODO CachedBatchReadsMap分析
- 4commit
- 5multiGet
- 6multiPut
- 7multiUpdate
- 四storm如何調(diào)用MapState的代碼
- 1GroupedStream類(lèi)
- 零概述 MapState被調(diào)用的全流程代碼
Trident及State的原理請(qǐng)見(jiàn)另一篇文章:http://blog.csdn.net/lujinhong2/article/details/47132305
簡(jiǎn)單總結(jié):
1、最簡(jiǎn)單的情況使用IBacking的邏輯,很容易實(shí)現(xiàn)k-v格式的state。
2、如果IBacking不夠靈活(不能取得txid,不是kv而是多列的格式),則直接實(shí)現(xiàn)MapState的接口。
3、最復(fù)雜的是使用State接口,最靈活,但真有必要嗎?
第一二種方法比較:persistenceAggregate 第一個(gè)參數(shù)關(guān)鍵定義了如何去更新state(如mysql中的內(nèi)容),比如先取出數(shù)據(jù),更新txid,再寫(xiě)回去之類(lèi)的,而第二個(gè)參數(shù)定義了以什么邏輯去更新數(shù)據(jù),如求和、計(jì)算、還是平均之類(lèi)的。 因此,反正第一個(gè)參數(shù)都只是返回一個(gè)MapState對(duì)象,那使用IBacking接口還是直接使用MapState接口都可以了,只是前者作了一些txid邏輯的封裝,對(duì)應(yīng)于幾種state的類(lèi)型,因此使用方便了一點(diǎn),便事實(shí)上,它的代碼是很簡(jiǎn)單的,它就是通過(guò)判斷txid的關(guān)系來(lái)定義了update是如何使用get和put的,所以,可以直接實(shí)現(xiàn)MapState接口的update方法即可。
一、State基礎(chǔ)示例
trident通過(guò)spout的事務(wù)性與state的事務(wù)處理,保證了恰好一次的語(yǔ)義。這里介紹了如何使用state。
完整代碼請(qǐng)見(jiàn) https://github.com/lujinhong/tridentdemo
1、主類(lèi)
主類(lèi)定義了拓?fù)涞恼w邏輯,這個(gè)拓?fù)渫ㄟ^(guò)一個(gè)固定的spout循環(huán)產(chǎn)生數(shù)據(jù),然后統(tǒng)計(jì)消息中每個(gè)名字出現(xiàn)的次數(shù)。
拓?fù)渲邢葘⑾⒅械膬?nèi)容提取出來(lái)成name, age, title, tel4個(gè)field,然后通過(guò)project只保留name字段供統(tǒng)計(jì),接著按照name分區(qū)后,為每個(gè)分區(qū)進(jìn)行聚合,最后將聚合結(jié)果通過(guò)state寫(xiě)入map中。
storm.trident.Stream Origin_Stream = topology.newStream("tridentStateDemoId", spout).parallelismHint(3).shuffle().parallelismHint(3).each(new Fields("msg"), new Splitfield(),new Fields("name", "age", "title", "tel")).parallelismHint(3).project(new Fields("name")) //其實(shí)沒(méi)什么必要,上面就不需要發(fā)射BCD字段,但可以示范一下project的用法.parallelismHint(3).partitionBy(new Fields("name")); //根據(jù)name的值作分區(qū)Origin_Stream.partitionAggregate(new Fields("name"), new NameCountAggregator(),new Fields("nameSumKey", "nameSumValue")).partitionPersist(new NameSumStateFactory(), new Fields("nameSumKey", "nameSumValue"),new NameSumUpdater());2、Aggregator的用法
這里涉及了一些trident常用的API,但project等相對(duì)容易理解,這里只介紹partitionAggregate的用法。
再看看上面代碼中對(duì)partitionAggregate的使用:
Origin_Stream.partitionAggregate(new Fields("name"), new NameCountAggregator(),new Fields("nameSumKey", "nameSumValue"))第一,三個(gè)參數(shù)分別表示輸入流的名稱(chēng)與輸出流的名稱(chēng)。中間的NameCountAggregator是一個(gè)Aggregator的對(duì)象,它定義了如何對(duì)輸入流進(jìn)行聚合。我們看一下它的代碼:
public class NameCountAggregator implements Aggregator<Map<String, Integer>> {private static final long serialVersionUID = -5141558506999420908L;@Overridepublic Map<String, Integer> init(Object batchId,TridentCollector collector) {return new HashMap<String, Integer>();}//判斷某個(gè)名字是否已經(jīng)存在于map中,若無(wú),則put,若有,則遞增@Overridepublic void aggregate(Map<String, Integer> map,TridentTuple tuple, TridentCollector collector) {String key=tuple.getString(0);if(map.containsKey(key)){Integer tmp=map.get(key);map.put(key, ++tmp);}else{map.put(key, 1);}}//將聚合后的結(jié)果emit出去@Overridepublic void complete(Map<String, Integer> map,TridentCollector collector) {if (map.size() > 0) {for(Entry<String, Integer> entry : map.entrySet()){System.out.println("Thread.id="+Thread.currentThread().getId()+"|"+entry.getKey()+"|"+entry.getValue());collector.emit(new Values(entry.getKey(),entry.getValue()));}map.clear();} }@Overridepublic void prepare(Map conf, TridentOperationContext context) {}@Overridepublic void cleanup() {}}(1)Aggregator接口
它實(shí)現(xiàn)了Aggregator接口,這個(gè)接口有3個(gè)方法:
public interface Aggregator<T> extends Operation {T init(Object batchId, TridentCollector collector);void aggregate(T val, TridentTuple tuple, TridentCollector collector);void complete(T val, TridentCollector collector); }init方法:在處理batch之前被調(diào)用。init的返回值是一個(gè)表示聚合狀態(tài)的對(duì)象,該對(duì)象會(huì)被傳遞到aggregate和complete方法。
aggregate方法:為每個(gè)在batch分區(qū)的輸入元組所調(diào)用,更新?tīng)顟B(tài)
complete方法:當(dāng)batch分區(qū)的所有元組已經(jīng)被aggregate方法處理完后被調(diào)用。
除了實(shí)現(xiàn)Aggregator接口,還可以實(shí)現(xiàn)ReducerAggregator或者CombinerAggregator,它們使用更方便。詳見(jiàn)《從零開(kāi)始學(xué)storm》或者官方文檔
https://storm.apache.org/documentation/Trident-API-Overview.html
下面我們看一下這3個(gè)方法的實(shí)現(xiàn)。
(2)init方法
@Override public Map<String, Integer> init(Object batchId,TridentCollector collector) {return new HashMap<String, Integer>(); }僅初始化了一個(gè)HashMap對(duì)象,這個(gè)對(duì)象會(huì)作為參數(shù)傳給aggregate和complete方法。對(duì)一個(gè)batch只執(zhí)行一次。
(3)aggregate方法
aggregate方法對(duì)于batch內(nèi)的每一個(gè)tuple均執(zhí)行一次。這里將這個(gè)batch內(nèi)的名字出現(xiàn)的次數(shù)放到init方法所初始化的map中。
@Override public void aggregate(Map<String, Integer> map,TridentTuple tuple, TridentCollector collector) {String key=tuple.getString(0);if(map.containsKey(key)){Integer tmp=map.get(key);map.put(key, ++tmp);}else{map.put(key, 1);} }(4)complete方法
這里在complete將aggregate處理完的結(jié)果發(fā)送出去,實(shí)際上可以在任何地方emit,比如在aggregate里面。
這個(gè)方法對(duì)于一個(gè)batch也只執(zhí)行一次。
3、state的用法
(1)拓?fù)涠x
先看一下主類(lèi)中如何將結(jié)果寫(xiě)入state:
partitionPersist(new NameSumStateFactory(), new Fields("nameSumKey", "nameSumValue"),new NameSumUpdater());它的定義為:
TridentState storm.trident.Stream.partitionPersist(StateFactory stateFactory, Fields inputFields, StateUpdater updater)其中的第二個(gè)參數(shù)比較容易理解,就是輸入流的名稱(chēng),這里是名字與它出現(xiàn)的個(gè)數(shù)。下面先看一下Facotry。
(2)工廠類(lèi):NameSumStateFactory
很簡(jiǎn)單,它實(shí)現(xiàn)了StateFactory,只有一個(gè)方法makeState,返回一個(gè)State類(lèi)型的對(duì)象。
public class NameSumStateFactory implements StateFactory {private static final long serialVersionUID = 8753337648320982637L;@Overridepublic State makeState(Map arg0, IMetricsContext arg1, int arg2, int arg3) {return new NameSumState(); } }(3)更新類(lèi):NameSumUpdater
這個(gè)類(lèi)繼承自BaseStateUpdater,它的updateState對(duì)batch的內(nèi)容進(jìn)行處理,這里是將batch的內(nèi)容放到一個(gè)map中,然后調(diào)用setBulk方法
public class NameSumUpdater extends BaseStateUpdater<NameSumState> {private static final long serialVersionUID = -6108745529419385248L;public void updateState(NameSumState state, List<TridentTuple> tuples, TridentCollector collector) {Map<String,Integer> map=new HashMap<String,Integer>();for(TridentTuple t: tuples) {map.put(t.getString(0), t.getInteger(1));}state.setBulk(map);} }(4)狀態(tài)類(lèi):NameSumState
這是state最核心的類(lèi),它實(shí)現(xiàn)了大部分的邏輯。NameSumState實(shí)現(xiàn)了State接口:
public interface State {void beginCommit(Long txid); void commit(Long txid); }分別在提交之前與提交成功的時(shí)候調(diào)用,在這里只打印了一些信息。
另外NameSumState還定義了如何處理NameSumUpdater傳遞的消息:
public void setBulk(Map<String, Integer> map) {// 將新到的tuple累加至map中for (Entry<String, Integer> entry : map.entrySet()) {String key = entry.getKey();if (this.map.containsKey(key)) {this.map.put(key, this.map.get(key) + map.get(key));} else {this.map.put(key, entry.getValue());}}System.out.println("-------");// 將map中的當(dāng)前狀態(tài)打印出來(lái)。for (Entry<String, Integer> entry : this.map.entrySet()) {String Key = entry.getKey();Integer Value = entry.getValue();System.out.println(Key + "|" + Value);} }即將NameSumUpdater傳送過(guò)來(lái)的內(nèi)容寫(xiě)入一個(gè)HashMap中,并打印出來(lái)。
此處將state記錄在一個(gè)HashMap中,如果需要記錄在其它地方,如mysql,則使用jdbc寫(xiě)入mysql代替下面的map操作即可。
事實(shí)上,這個(gè)操作不一定要在state中執(zhí)行,可以在任何類(lèi)中,但建議還是在state類(lèi)中實(shí)現(xiàn)。
4、state應(yīng)用步驟總結(jié)
partitionPersist(new NameSumStateFactory(), new Fields("nameSumKey", "nameSumValue"),new NameSumUpdater());state的應(yīng)用步驟相當(dāng)簡(jiǎn)單,原理也很簡(jiǎn)單:
(1)NameSumStateFactory()指定了將結(jié)果保存在哪里,如本例中的hashset,還可以是mysql/hbase等。當(dāng)然還有更新邏輯,
(2)NameSumUpdater()指定了更新state的邏輯,如將當(dāng)前數(shù)據(jù)和原有數(shù)據(jù)相加等。
5、state應(yīng)用的一些注意事項(xiàng)
(1)使用state,你不再需要比較事務(wù)id,在數(shù)據(jù)庫(kù)中同時(shí)寫(xiě)入多個(gè)值等內(nèi)容,而是專(zhuān)注于你的邏輯實(shí)現(xiàn)
(2)除了實(shí)現(xiàn)State接口,更常用的是實(shí)現(xiàn)MapState接口,下次補(bǔ)充。
(3)在拓?fù)渲兄付薙tateFactory,這個(gè)工廠類(lèi)找到相應(yīng)的State類(lèi)。而Updater則每個(gè)批次均會(huì)調(diào)用它的方法。State中則定義了如何保存數(shù)據(jù),這里將數(shù)據(jù)保存在內(nèi)存中的一個(gè)HashMap,還可以保存在mysql, hbase等等。
(4)trident會(huì)自動(dòng)比較txid的值,如果和當(dāng)前一樣,則不更改狀態(tài),如果是當(dāng)前txid的下一個(gè)值,則更新?tīng)顟B(tài)。這種邏輯不需要用戶(hù)處理。
(5)如果需要實(shí)現(xiàn)透明事務(wù)狀態(tài),則需要保存當(dāng)前值與上一個(gè)值,在update的時(shí)候2個(gè)要同時(shí)處理。即邏輯由自己實(shí)現(xiàn)。在本例子中,大致思路是在NameSumState中創(chuàng)建2個(gè)HashMap,分別對(duì)應(yīng)當(dāng)前與上一個(gè)狀態(tài)的值,而NameSumUpdater每次更新這2個(gè)Map。
6、state與MapState的差異
(1)由上面可以看出,state需要自己指定如何更新數(shù)據(jù)
if (this.map.containsKey(key)) {this.map.put(key, this.map.get(key) + map.get(key));} else {this.map.put(key, entry.getValue());} }這里是將原有的值,加上新到的值。而MapState會(huì)根據(jù)你選擇的類(lèi)型(Transactional, Opaque, NonTransactional)定義好邏輯,只要定義如果向state中讀寫(xiě)數(shù)據(jù)即可。
(2)MapState將State的aggreate與persistent 2部分操作合在一起了,由方法名也可以看出。在State中最后2步是partitionAggregate()與partitionPersistent(),而在MapState中最后1步是persistentAggregate()
事實(shí)上,查看persistentAggregate()的實(shí)現(xiàn),它最終也是分成aggregate和persistent 2個(gè)步驟的。
二、MapState
1、persistentAggregate
Trident有另外一種更新State的方法叫做persistentAggregate。如下:
TridentTopology topology = new TridentTopology(); TridentState wordCounts = topology.newStream("spout1", spout) .each(new Fields("sentence"), new Split(), new Fields("word")) .groupBy(new Fields("word")) .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"))persistentAggregate是在partitionPersist之上的另外一層抽象。它知道怎么去使用一個(gè)Trident 聚合器來(lái)更新State。在這個(gè)例子當(dāng)中,因?yàn)檫@是一個(gè)group好的stream,Trident會(huì)期待你提供的state是實(shí)現(xiàn)了MapState接口的。用來(lái)進(jìn)行g(shù)roup的字段會(huì)以key的形式存在于State當(dāng)中,聚合后的結(jié)果會(huì)以value的形式存儲(chǔ)在State當(dāng)中。MapState接口看上去如下所示:
public interface MapState<T> extends State { List<T> multiGet(List<List<Object>> keys); List<T> multiUpdate(List<List<Object>> keys, List<ValueUpdater> updaters); void multiPut(List<List<Object>> keys, List<T> vals); }當(dāng)你在一個(gè)未經(jīng)過(guò)group的stream上面進(jìn)行聚合的話,Trident會(huì)期待你的state實(shí)現(xiàn)Snapshottable接口:
public interface Snapshottable<T> extends State { T get(); T update(ValueUpdater updater); void set(T o); }MemoryMapState 和 MemcachedState 都實(shí)現(xiàn)了上面的2個(gè)接口。
2、MapStates
在Trident中實(shí)現(xiàn)MapState是非常簡(jiǎn)單的,它幾乎幫你做了所有的事情。OpaqueMap, TransactionalMap, 和 NonTransactionalMap 類(lèi)實(shí)現(xiàn)了所有相關(guān)的邏輯,包括容錯(cuò)的邏輯。你只需要將一個(gè)IBackingMap 的實(shí)現(xiàn)提供給這些類(lèi)就可以了。IBackingMap接口看上去如下所示:
public interface IBackingMap<T> { List<T> multiGet(List<List<Object>> keys); void multiPut(List<List<Object>> keys, List<T> vals); }OpaqueMap’s會(huì)用OpaqueValue的value來(lái)調(diào)用multiPut方法,TransactionalMap’s會(huì)提供TransactionalValue中的value,而NonTransactionalMaps只是簡(jiǎn)單的把從Topology獲取的object傳遞給multiPut。
Trident還提供了一種CachedMap類(lèi)來(lái)進(jìn)行自動(dòng)的LRU cache。
另外,Trident 提供了 SnapshottableMap 類(lèi)將一個(gè)MapState 轉(zhuǎn)換成一個(gè) Snapshottable 對(duì)象.
大家可以看看 MemcachedState的實(shí)現(xiàn),從而學(xué)習(xí)一下怎樣將這些工具組合在一起形成一個(gè)高性能的MapState實(shí)現(xiàn)。MemcachedState是允許大家選擇使用opaque transactional, transactional, 還是 non-transactional 語(yǔ)義的。
實(shí)現(xiàn)一個(gè)MapState,可以實(shí)現(xiàn)IBackingMap接口(mutliGet()/multiPut),并且實(shí)現(xiàn)StateFactory接口(makeState()),返回一個(gè)State對(duì)象,這是常見(jiàn)的用法
* 但如果有一引起高級(jí)需求,可以直接實(shí)現(xiàn)MapState接口,這樣可以覆蓋一些如beginCommit(Long txid);commit(Long txid);這些方法,還有multiUpdate()。*
3、Demo
完整代碼請(qǐng)見(jiàn) https://github.com/lujinhong/tridentdemo
- 更詳細(xì)的可以參考trident-memcached(很全面,但較舊)
https://github.com/nathanmarz/trident-memcached - 或者storm-hbase的State實(shí)現(xiàn)等
在Trident中實(shí)現(xiàn)MapState是非常簡(jiǎn)單的,它和單純的State不同點(diǎn)在于:OpaqueMap, TransactionalMap 和 NonTransactionalMap會(huì)實(shí)現(xiàn)相關(guān)的容錯(cuò)邏輯,只需為這些類(lèi)提供一個(gè)IBackingMap接口實(shí)現(xiàn),調(diào)用multiGet和multiPut方法訪問(wèn)各自的K/V值。
public interface IBackingMap<T> {List<T> multiGet(List<List<Object>> keys); void multiPut(List<List<Object>> keys, List<T> vals); }詳細(xì)的步驟如下:
(1)創(chuàng)建一個(gè)實(shí)現(xiàn)IBackingMap的類(lèi),實(shí)現(xiàn)multiGet和multiPut方法
主要實(shí)現(xiàn)multiGet和multiPut的方法,實(shí)現(xiàn)如何從state中讀寫(xiě)數(shù)據(jù)。
multiGet 的參數(shù)是一個(gè)List,可以根據(jù)key來(lái)查詢(xún)數(shù)據(jù),key本身也是一個(gè)List,以方便多個(gè)值組合成key的情形。
multiPut的參數(shù)是一個(gè)List類(lèi)型的keys和一個(gè)List類(lèi)型的values,它們的size應(yīng)該是相等的,把這些值寫(xiě)入state中。
這里將k/v寫(xiě)入了一個(gè)HashMap中,如果需要寫(xiě)入mysql,則只需要使用jdbc,把db.put改為寫(xiě)入mysql即可,查詢(xún)類(lèi)似。
(2)創(chuàng)建實(shí)現(xiàn)StateFactory的類(lèi)
public class MemoryMapStateFacotry implements StateFactory{@Overridepublic State makeState(Map conf, IMetricsContext metrics,int partitionIndex, int numPartitions) {return TransactionalMap.build((IBackingMap<TransactionalValue>) new MemoryMapStateBacking());}}很簡(jiǎn)單,就返回一個(gè)實(shí)現(xiàn)了MapState接口的類(lèi)對(duì)象,通過(guò)把上面定義的MemoryMapStateBacking對(duì)象傳入TransactionalMap.build作參數(shù)即可。當(dāng)然還可以使用:
NonTransactionalMap.build(state);b OpaqueMap.build(state);(3)定義Count函數(shù)
用于說(shuō)明如果將新來(lái)的數(shù)據(jù)與原來(lái)state中的數(shù)據(jù)組合更新。這里使用了storm提供的一個(gè)工具類(lèi),它將新來(lái)到的值與原有的值相加。
public class Count implements CombinerAggregator<Long> {@Overridepublic Long init(TridentTuple tuple) {return 1L;}@Overridepublic Long combine(Long val1, Long val2) {return val1 + val2;}@Overridepublic Long zero() {return 0L;}}(4)在拓?fù)渲袑?xiě)入state,或者查詢(xún)state
//這個(gè)流程用于統(tǒng)計(jì)單詞數(shù)據(jù),結(jié)果將被保存在wordCounts中TridentState wordCounts =topology.newStream("spout1", spout).parallelismHint(16).each(new Fields("sentence"), new Split(), new Fields("word")).groupBy(new Fields("word")).persistentAggregate(new MemoryMapStateFacotry(), new Count(),new Fields("count")).parallelismHint(16);//這個(gè)流程用于查詢(xún)上面的統(tǒng)計(jì)結(jié)果topology.newDRPCStream("words", drpc).each(new Fields("args"), new Split(), new Fields("word")).groupBy(new Fields("word")).stateQuery(wordCounts, new Fields("word"), new MapGet(), new Fields("count")).each(new Fields("count"), new FilterNull()).aggregate(new Fields("count"), new Sum(), new Fields("sum"));return topology.build();4、關(guān)于MapState的總結(jié)
(1)基本步驟
(1)創(chuàng)建一個(gè)實(shí)現(xiàn)IBackingMap的類(lèi),實(shí)現(xiàn)multiGet和multiPut方法
(2)創(chuàng)建實(shí)現(xiàn)StateFactory的類(lèi),它的makeState返回一個(gè)實(shí)現(xiàn)了MapState接口的對(duì)象,可以通過(guò):
mapState = TransactionalMap.build(_iBacking);其中_iBacking就是第一步實(shí)現(xiàn)類(lèi)的對(duì)象。當(dāng)然還可以使用
mapState = NonTransactionalMap.build(state);mapState = OpaqueMap.build(state);TransactionalMap,OpaqueMap, NonTransactionalMap已經(jīng)通過(guò)判斷txid的值實(shí)現(xiàn)了相應(yīng)的事務(wù)邏輯,以TransactionalMap為例,它的源碼中會(huì)判斷batch中的txid與state中已經(jīng)存儲(chǔ)的是否相同,或者同的話則新值等于舊值即可:
if(_currTx!=null && _currTx.equals(val.getTxid()) && !retval.cached)
(3)在拓?fù)渲惺褂胮ersistentAggregate寫(xiě)入state
(2)全流程邏輯
以事務(wù)型狀態(tài)為例,我們看一下整個(gè)存儲(chǔ)過(guò)程的邏輯:
* 首先,persistentAggregate收到一批數(shù)據(jù),它的第一個(gè)參數(shù)返回的是事務(wù)型的MapState
* 然后,TransactionalMap在multiUpdate中會(huì)判斷這個(gè)事務(wù)的txid與當(dāng)前state中的txid是否一致。
* 如果txid一致的話,則保持原來(lái)的值即可,如果txid不一致,則更新數(shù)值。
* 如果更新數(shù)據(jù)呢?它是拿新來(lái)的值和state中的原有的值,使用persistentAggregate中第2個(gè)參數(shù)定義的類(lèi)方法作聚合計(jì)算。
* 第一個(gè)參數(shù)關(guān)鍵定義了如何去更新state(如mysql中的內(nèi)容),比如先取出數(shù)據(jù),更新txid,再寫(xiě)回去之類(lèi)的,而第二個(gè)參數(shù)定義了以什么邏輯去更新數(shù)據(jù),如求和、計(jì)算、還是平均之類(lèi)的。* 因此,反正第一個(gè)參數(shù)都只是返回一個(gè)MapState對(duì)象,那使用IBacking接口還是直接使用MapState接口都可以了,只是前者作了一些txid邏輯的封裝,對(duì)應(yīng)于幾種state的類(lèi)型,因此使用方便了一點(diǎn),便事實(shí)上,它的代碼是很簡(jiǎn)單的,它就是通過(guò)判斷txid的關(guān)系來(lái)定義了update是如何使用get和put的,所以,可以直接實(shí)現(xiàn)MapState接口的update方法即可。
persistentAggregate的第2個(gè)參數(shù)定義了數(shù)據(jù)是如何更新的,而IBackingMap中的multiGet和multiPut只定義了如何向state中存取數(shù)據(jù)。
比如此處的Count,它會(huì)將將2個(gè)數(shù)據(jù)相加:
因此新來(lái)的統(tǒng)計(jì)次數(shù)與原有的統(tǒng)計(jì)次數(shù)加起來(lái)即是新的總和。
而對(duì)于透明事務(wù)狀態(tài),不管txid是否一致,都需要修改state,同時(shí)將當(dāng)前state保存一下,成為preState。非事務(wù)型就簡(jiǎn)單了,不管你來(lái)什么,我都直接更新。
(3)復(fù)雜的情況
當(dāng)然,如果覺(jué)得TransactionalMap,OpaqueMap, NonTransactionalMap不能滿(mǎn)足業(yè)務(wù)需求,則可以自定義一個(gè)實(shí)現(xiàn)了MapState接口的類(lèi),而不是直接使用它們。
反正這三個(gè)類(lèi)的實(shí)現(xiàn)邏輯非常簡(jiǎn)單,當(dāng)不能滿(mǎn)足業(yè)務(wù)需要時(shí),看一下源碼,然后參考它創(chuàng)建自己的類(lèi)即可,此時(shí),關(guān)鍵是multiUpdate的實(shí)現(xiàn)。
(4)其它思考
key可以是一個(gè)很復(fù)雜的List,包括多個(gè)字段。
5、MapState讀寫(xiě)mysql示例
(1)MysqlMapStateFactory
public class MysqlMapStateFactory<T> implements StateFactory {private static final long serialVersionUID = 1987523234141L;@Overridepublic State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {return TransactionalMap.build((IBackingMap<TransactionalValue>) new MysqlMapStateBacking());} }很簡(jiǎn)單,就一行,返回一個(gè)IBacking對(duì)象。這里使用的Transactioal,當(dāng)然還可以使用NonTransactional和Opaque。
(2)MysqlMapStateBacking
最核心的還是multiGet()和multiPut:
@Overridepublic List<TransactionalValue> multiGet(List<List<Object>> keys) {if (stmt == null) {stmt = getStatment();}List<TransactionalValue> values = new ArrayList<TransactionalValue>();for (List<Object> key : keys) {String sql = "SELECT req_count FROM edt_analysis where id='" + key.get(0) + "'";LOG.debug("============sql: " + sql);try (ResultSet rs = stmt.executeQuery(sql)) {if (rs.next()) {LOG.info("Get value:{} by key:{}", rs.getObject(1), key);values.add(derialize(rs.getObject(1)));} else {values.add(null);}} catch (SQLException e) {e.printStackTrace();}}return values;}@Overridepublic void multiPut(List<List<Object>> keys, List<TransactionalValue> vals) {if (stmt == null) {stmt = getStatment();}for (int i = 0; i < keys.size(); i++) {String sql = "replace into edt_analysis values('" + keys.get(i).get(0) + "','" + serialize(vals.get(i))+ "')";LOG.debug("===================put sql " + sql);try {stmt.execute(sql);} catch (SQLException e) {e.printStackTrace();}}}但mysql與redis之類(lèi)的不同,它需要將一個(gè)TransactionalValue對(duì)象轉(zhuǎn)換為mysql中的一行數(shù)據(jù),同理,需要將mysql中的一行數(shù)據(jù)轉(zhuǎn)換為一個(gè)TransactionalValue對(duì)象:
// 將數(shù)據(jù)庫(kù)中的varchar轉(zhuǎn)換為T(mén)ransactionalValue對(duì)象 private TransactionalValue derialize(Object object) {String value[] = object.toString().split(",");return new TransactionalValue(Long.parseLong(value[0]), Long.parseLong(value[1])); }// 將TransactionalValue轉(zhuǎn)換為String private String serialize(TransactionalValue transactionalValue) {return transactionalValue.getTxid() + "," + transactionalValue.getVal(); }這是使用了最簡(jiǎn)單的方式,只有2列,一行是key,一列是value,value中保存了txid及真實(shí)的value,之間以逗號(hào)分隔。
三、以HBaseMapState為例分析MapState代碼調(diào)用全過(guò)程
(零)概述 & MapState被調(diào)用的全流程代碼
1、調(diào)用過(guò)程
(1)SubtopologyBolt implements ITridentBatchBolt這個(gè)bolt在完成一個(gè)batch的處理后會(huì)調(diào)用finishBatch(BatchInfo batchInfo)
(2)然后調(diào)用PartitionPersistProcessor implements TridentProcessor這個(gè)處理器的finishBatch(ProcessorContext processorContext)
(3)接著調(diào)用MapCombinerAggStateUpdater implements StateUpdater<MapState>的updateState(MapState map, List<TridentTuple> tuples, TridentCollector collector)
(4)再接著調(diào)用TransactionalMap<T> implements MapState<T>的 multiUpdate(List<List<Object>> keys, List<ValueUpdater> updaters)
(5)最后就是調(diào)用用戶(hù)定義的MapState類(lèi)(如HBaseMapState)的multiGet()和multiPut()方法了。
TOTO:按著這個(gè)流程把代碼重頭讀一遍,先了解ITridentBatchBolt。
簡(jiǎn)單的說(shuō)就是一個(gè)blot被處理完后,會(huì)調(diào)用finishBatch()方法,然后這個(gè)方法會(huì)調(diào)用MapState()框架的updateState(),接著調(diào)用mutliUpdate(),最后調(diào)用用戶(hù)定義的multiGet()和multiPut()。
2、內(nèi)容概述
本部分我們將以HBaseMapState為例,介紹使用MapState保證數(shù)據(jù)完整唯一的全流程代碼調(diào)用,主要分成這幾個(gè)部分:
(1)我們先介紹用戶(hù)如何在構(gòu)建代碼中使用這個(gè)MapState
(2)然后介紹HBaseMapState的源代碼,這也是用戶(hù)需要實(shí)現(xiàn)一個(gè)MapState的基本方法。
(3)接著介紹MapState框架如何調(diào)用用戶(hù)定義的代碼形成事務(wù)性。
(4)最后介紹storm的內(nèi)部機(jī)制,如何調(diào)用MapState。
這也是用戶(hù)如何要看源碼的逐步深入的過(guò)程。
(一)如何使用MapState
詳細(xì)DEMO請(qǐng)見(jiàn):https://github.com/lujinhong/stormhbasedemo
1、指定一些配置
HBaseMapState.Options option = new HBaseMapState.Options();option.tableName = "ljhtest2";option.columnFamily = "f1";option.mapMapper = new SimpleTridentHBaseMapMapper("ms");SimpleTridentHBaseMapMapper主要用于獲取Rowkey和qualifier。Option的完整選項(xiàng)見(jiàn)下面的源碼分析。
2、指定state
topology.newStream("kafka", kafkaSpout).shuffle().each(new Fields("str"), new WordSplit(), new Fields("word")).groupBy(new Fields("word")).persistentAggregate(HBaseMapState.transactional(option), new Count(), new Fields("aggregates_words")).parallelismHint(1);這里使用Option對(duì)象來(lái)構(gòu)建一個(gè)HBaseMapStateFactory。
還可以通過(guò)
分別創(chuàng)建非事務(wù)與透明的state。
這里使用了storm內(nèi)建的Count()方法,如果使用Sum,用法如下:
.persistentAggregate(HBaseMapState.transactional(option), new Fields("cash"), new Sum(), new Fields("state")).parallelismHint(1);當(dāng)然還可以自定義方法,這里自定義方法也就可以自定義保存在hbase的數(shù)據(jù)類(lèi)型了。
(二)如何實(shí)現(xiàn)一個(gè)MapState:HBaseMapState源碼分析
HBaseMapState的主要代碼都在HBaseMapState類(lèi)中。一個(gè)MapState的實(shí)現(xiàn)關(guān)鍵在于
* 構(gòu)建一個(gè)實(shí)現(xiàn)StateFactory的類(lèi),實(shí)現(xiàn)makeState() 方法,返回一個(gè)State對(duì)象。
* 一個(gè)MapState,實(shí)現(xiàn)IBackingMap接口的multiGet()和multiPut(),指定如何從hbase中讀寫(xiě)數(shù)據(jù)。
關(guān)于mapstate的基礎(chǔ)介紹請(qǐng)參考上面。
1、Option內(nèi)部類(lèi)
HBaseMapState有一個(gè)內(nèi)部類(lèi):Option,用于指定一些配置項(xiàng)。
public static class Options<T> implements Serializable {public Serializer<T> serializer = null;public int cacheSize = 5000;public String globalKey = "$HBASE_STATE_GLOBAL$";public String configKey = "hbase.config";public String tableName;public String columnFamily;public TridentHBaseMapMapper mapMapper; }分別意思為:
* 序列化器,即以什么格式寫(xiě)入hbase,storm-hbase自帶了JSON格式的序列化實(shí)現(xiàn)。
* 緩沖大小
* 未知
* 指定hbase-site.xml位置的變量
* 表名
* family名
* 用于獲取rowkey和qualifier,創(chuàng)建對(duì)象時(shí)需要指定一個(gè)參數(shù)作為qualifier。
2、Factory內(nèi)部類(lèi)
(1)構(gòu)造函數(shù)
構(gòu)造函數(shù)接收2個(gè)參數(shù),分別為state的類(lèi)型以及Option對(duì)象。
除些以外,還指定了序列化器:
(2)makeState()方法
就是返回一個(gè)State對(duì)象。
3、構(gòu)造函數(shù)
構(gòu)造函數(shù)用于加載配置文件,安全機(jī)制等。
4、返回StateFactory的方法
沒(méi)什么好介紹的,就是返回各種類(lèi)型的staStateFactory,具體的說(shuō)就是返回上面Factory的一個(gè)對(duì)象。這里只保留了透明型的。
@SuppressWarnings("rawtypes") public static StateFactory opaque() {Options<OpaqueValue> options = new Options<OpaqueValue>();return opaque(options); }@SuppressWarnings("rawtypes") public static StateFactory opaque(Options<OpaqueValue> opts) {return new Factory(StateType.OPAQUE, opts); }5、multiGet
根據(jù)一個(gè)List<List<Object>> keys列表獲取到一個(gè)返回值的列表List。注意key本身也是一個(gè)List<Object>。
代碼主要是三部分:
(1)創(chuàng)建List<Get> gets
(2)查詢(xún)hbase:根據(jù)gets獲取Result[]
List<T> retval = new ArrayList<T>();Result[] results = this.table.get(gets);(3)將results封裝成一個(gè)List<T> retval并返回
for (int i = 0; i < keys.size(); i++) {String qualifier = this.options.mapMapper.qualifier(keys.get(i));Result result = results[i];byte[] value = result.getValue(this.options.columnFamily.getBytes(), qualifier.getBytes());if(value != null) {retval.add(this.serializer.deserialize(value));} else {retval.add(null);}} return retval;當(dāng)返回值為空時(shí),則加上null。
6、multiPut
它將一個(gè)List<List<Object>> keys, List<T> values的數(shù)據(jù)寫(xiě)入hbase,注意keys.size()與values.size()必須相等。
List<Put> puts = new ArrayList<Put>(keys.size());for (int i = 0; i < keys.size(); i++) {byte[] hbaseKey = this.options.mapMapper.rowKey(keys.get(i));String qualifier = this.options.mapMapper.qualifier(keys.get(i));LOG.info("Partiton: {}, Key: {}, Value: {}", new Object[]{this.partitionNum, new String(hbaseKey), new String(this.serializer.serialize(values.get(i)))});Put put = new Put(hbaseKey);T val = values.get(i);put.add(this.options.columnFamily.getBytes(),qualifier.getBytes(),this.serializer.serialize(val));puts.add(put);this.table.put(puts);7、序列化器
序列化器指定了以何種格式將數(shù)據(jù)寫(xiě)入hbase(序列化),以及取出數(shù)據(jù)后如何進(jìn)行解釋(反序列化),即關(guān)鍵是serialize()與deserialize()這2個(gè)方法。
storm默認(rèn)提供了json的實(shí)現(xiàn),以Transactional為例:
public class JSONTransactionalSerializer implements Serializer<TransactionalValue>它的內(nèi)部只有2個(gè)方法:
@Override public byte[] serialize(TransactionalValue obj) {List toSer = new ArrayList(2);toSer.add(obj.getTxid());toSer.add(obj.getVal());try {return JSONValue.toJSONString(toSer).getBytes("UTF-8");} catch (UnsupportedEncodingException e) {throw new RuntimeException(e);} }它將一個(gè)TransactionalValue轉(zhuǎn)化為json格式,TransactionalValue只有2個(gè)變量,是一個(gè)典型的bean:
T val; Long txid;而另一個(gè)方法deserialize()則剛好相反,它將一個(gè)json格式字節(jié)流解釋為一個(gè)TransactionalValue對(duì)象:
@Override public TransactionalValue deserialize(byte[] b) {try {String s = new String(b, "UTF-8");List deser = (List) JSONValue.parse(s);return new TransactionalValue((Long) deser.get(0), deser.get(1));} catch (UnsupportedEncodingException e) {throw new RuntimeException(e);} }(三)MapState框架
//TODO:補(bǔ)充各個(gè)類(lèi)的關(guān)系圖,參考P323
上述介紹了用戶(hù)如何通過(guò)實(shí)現(xiàn)IBackingMap接口來(lái)創(chuàng)建自己的MapState實(shí)現(xiàn),這里我們將介紹MapState框架是如何調(diào)用用戶(hù)寫(xiě)的mutliGet()和multiPut方法的。
* 另外,如果上述實(shí)現(xiàn)iBackingMap的方法不能滿(mǎn)足你的要求,你可以實(shí)現(xiàn)自己的MapState框架,按照這里介紹的方法即可 *
我們主要以Transactional為例,再簡(jiǎn)單介紹一下NonTransactional和Opaque的情形。在上面的Factory.makeState()方法中:
IBackingMap state = new HBaseMapState(options, conf, partitionIndex); mapState = TransactionalMap.build(state);state就是用戶(hù)代碼定義的MapState實(shí)現(xiàn),此此處是HBaseMapState。我們下面看一下TransactionalMap是如何調(diào)用HBaseMapState的mutliGet()和multiPut方法的。
1、build()方法
我們從build方法開(kāi)始,因?yàn)檫@是用戶(hù)創(chuàng)建MapState所調(diào)用的API。
public static <T> MapState<T> build(IBackingMap<TransactionalValue> backing) {return new TransactionalMap<T>(backing); }它使用用戶(hù)定義的IBackingMap對(duì)象創(chuàng)建一個(gè)MapState對(duì)象,主要通過(guò)構(gòu)造方法來(lái)實(shí)現(xiàn)。
2、構(gòu)造方法
protected TransactionalMap(IBackingMap<TransactionalValue> backing) {_backing = new CachedBatchReadsMap(backing); }3、beginCommit()
@Override public void beginCommit(Long txid) {_currTx = txid;_backing.reset(); }當(dāng)開(kāi)始處理一個(gè)事務(wù)時(shí),設(shè)置當(dāng)前正在處理的txid,reset()是CachedBatchReadsMap類(lèi)中清空緩存的方法。
TODO: CachedBatchReadsMap分析
4、commit()
@Override public void commit(Long txid) {_currTx = null;_backing.reset(); }當(dāng)一個(gè)事務(wù)處理完成后,將txid設(shè)置為null。
5、multiGet
@Override public List<T> multiGet(List<List<Object>> keys) {List<CachedBatchReadsMap.RetVal<TransactionalValue>> vals = _backing.multiGet(keys);List<T> ret = new ArrayList<T>(vals.size());for(CachedBatchReadsMap.RetVal<TransactionalValue> retval: vals) {TransactionalValue v = retval.val;if(v!=null) {ret.add((T) v.getVal());} else {ret.add(null);}}return ret; }通過(guò)調(diào)用用戶(hù)的_backing.multiGet(keys)來(lái)實(shí)現(xiàn)具體邏輯,作了一些類(lèi)型轉(zhuǎn)換。
6、multiPut()
@Override public void multiPut(List<List<Object>> keys, List<T> vals) {List<TransactionalValue> newVals = new ArrayList<TransactionalValue>(vals.size());for(T val: vals) {newVals.add(new TransactionalValue<T>(_currTx, val));}_backing.multiPut(keys, newVals); }同樣只是調(diào)用用戶(hù)定位的multiPut()。
7、multiUpdate()
核心的邏輯在于這幾行:
if(val==null) {newVal = new TransactionalValue<T>(_currTx, updater.update(null));changed = true;} else {if(_currTx!=null && _currTx.equals(val.getTxid()) && !retval.cached) {newVal = val;} else {newVal = new TransactionalValue<T>(_currTx, updater.update(val.getVal()));changed = true;}}ret.add(newVal.getVal());if(changed) {newVals.add(newVal);newKeys.add(keys.get(i));}}在這之前,先把數(shù)據(jù)get出來(lái),然后判斷:
- 如果key對(duì)應(yīng)的value為空,則changed為true
- 如果key對(duì)應(yīng)的value不為空,而且當(dāng)前的txid與value中的txid相同,則changed保持為false。
- 如果key對(duì)應(yīng)的value不為空,但當(dāng)前的txid與value中的txid不同,則changed為true。
這部分邏輯就是Transactional, NonTransactional和Opaque的差別。
NonTransactional不會(huì)判斷txid,只要來(lái)一批就更新一次。
Opaque基于之前的值作更新。
(四)storm如何調(diào)用MapState的代碼
根據(jù)前面的分析,用戶(hù)在拓?fù)涠x中通過(guò)以下類(lèi)似的代碼來(lái)指定state:
topology.newStream("wordsplit", spout).shuffle().each(new Fields("sentence"), new WordSplit(), new Fields("word")).groupBy(new Fields("word")).persistentAggregate(HBaseMapState.transactional(option), new Count(), new Fields("aggregates_words")).parallelismHint(1);主要看第3、4行,先對(duì)數(shù)據(jù)根據(jù)”word”這個(gè)流進(jìn)行分組,然后再調(diào)用persistentAggregate()方法。再簡(jiǎn)單解釋一下這個(gè)方法,3個(gè)參數(shù)分別為:
* 返回一個(gè)StateFactory對(duì)象,它有一個(gè)makeState()方法,返回一個(gè)State對(duì)象。這個(gè)state對(duì)象就是用戶(hù)定義的MapState,主要定義了如何從state中讀寫(xiě)數(shù)據(jù)。
* 第二個(gè)參數(shù)表示如何對(duì)取出的數(shù)據(jù)進(jìn)行什么操作,這里使用的是Count,如是其它類(lèi),如Sum,則多一個(gè)參數(shù):
* 發(fā)送的消息流。
好,我們下面開(kāi)始分析GroupedStream#persistentAggregate()做了什么東西。
1、GroupedStream類(lèi)
public TridentState persistentAggregate(StateFactory stateFactory, CombinerAggregator agg, Fields functionFields) {return persistentAggregate(new StateSpec(stateFactory), agg, functionFields); }public TridentState persistentAggregate(StateSpec spec, CombinerAggregator agg, Fields functionFields) {return persistentAggregate(spec, null, agg, functionFields); }public TridentState persistentAggregate(StateFactory stateFactory, Fields inputFields, CombinerAggregator agg, Fields functionFields) {return persistentAggregate(new StateSpec(stateFactory), inputFields, agg, functionFields); }很簡(jiǎn)單的代碼邏輯,先使用StateFactory對(duì)象創(chuàng)建一個(gè)StateSpec對(duì)象,然后繼續(xù)調(diào)用,從第3個(gè)方法可以看出,這里還有一個(gè)參數(shù)是表示inputFields,即輸入的field,即對(duì)哪個(gè)field執(zhí)行CombinerAggregator的操作。StateSpec類(lèi)的定義非常簡(jiǎn)單:
public class StateSpec implements Serializable {public StateFactory stateFactory;public Integer requiredNumPartitions = null;public StateSpec(StateFactory stateFactory) {this.stateFactory = stateFactory;} }最終真正調(diào)用的方法是這個(gè):
public TridentState persistentAggregate(StateSpec spec, Fields inputFields, CombinerAggregator agg, Fields functionFields) {return aggregate(inputFields, agg, functionFields).partitionPersist(spec,TridentUtils.fieldsUnion(_groupFields, functionFields),new MapCombinerAggStateUpdater(agg, _groupFields, functionFields),TridentUtils.fieldsConcat(_groupFields, functionFields)); }這個(gè)方法主要分成2個(gè)步驟
* 第一個(gè)是調(diào)用aggregate()方法,主要如何對(duì)數(shù)據(jù)進(jìn)行操作。這部分我們以后再分析,反正把它理解為一個(gè)數(shù)據(jù)的更新就好了。
* 第二個(gè)是調(diào)用partitionPersist()方法,如何將數(shù)據(jù)寫(xiě)入state。
構(gòu)建一個(gè)ProcessorNode,然后將它添加進(jìn)_topology中。
總結(jié)
以上是生活随笔為你收集整理的trident State应用指南的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: trident API指南
- 下一篇: storm UI解释