Storm里面fieldsGrouping和Field的概念详解
這個Field通常和fieldsGrouping分組機制一起使用,這個Field特別難理解,我自己也是在網上看了好多文章,感覺依舊講的不是很清楚,是似而非,沒有抓到重點。這個問題足足困擾了我3-4天時間,一直理解不了Field的概念,
當前我覺得new?Fields("word")就相當于表的表頭,就是定義這個域,這個域里面放的東西,是emit進去的
如果在declareOutputFields方法中new Fields("word1","word2")有2個及以上的fields,則在emit數據時new Value要與其對應(相當于key與value的關系),然后在topology組裝時,fieldsGrouping中的new Fields()可以為new Fields("word1")或new Fields("word2")或new Fields("word1",”word2")來指定接受上游spout或bolt的哪些fields
官方文檔里有這么一句話:“if the stream is grouped by the “user-id” field, tuples with the same “user-id” will always go to the same task”
一個task就是一個處理邏輯的實例,所以fields能根據tuple stream的id,也就是下面定義的xxx
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("xxx"));
}
xxx所代表的具體內容會由某一個task來處理,并且同一個xxx對應的內容,處理這個內容的task實例是同一個。
比如說:
bolt第一次emit三個流,即xxx有luonq pangyang qinnl三個值,假設分別建立三個task實例來處理:
luonq -> instance1
pangyang -> instance2
qinnl -> instance3
然后第二次emit四個流,即xxx有luonq qinnanluo py pangyang四個值,假設還是由剛才的三個task實例來處理:
luonq -> instance1
qinnanluo -> instance2
py -> instance3
pangyang -> instance2
然后第三次emit兩個流,即xxx有py qinnl兩個值,假設還是由剛才的三個task實例來處理:
py -> instance3
qinnl -> instance3
最后我們看看三個task實例都處理了哪些值,分別處理了多少次:
instance1: luonq(處理2次)
instance2: pangyang(處理2次) qinnanluo(處理1次)
instance3: qinnl(處理2次) py(處理2次)
結論:
1. emit發出的值第一次由哪個task實例處理是隨機的,此后再次出現這個值,就固定由最初處理他的那個task實例再次處理,直到topology結束
2. 一個task實例可以處理多個emit發出的值
3. 和shuffle Grouping的區別就在于,shuffle Grouping當emit發出同樣的值時,處理他的task是隨機的
例子1:
第一步:定義了一個表頭
public?void?declareOutputFields(OutputFieldsDeclarer?declarer)
????{
????????declarer.declare(new?Fields("word"));
????}
第二步:往這個Field空間里面emit進去內容(可以是Bolt和Spolt)
public?void?execute(Tuple?input,?BasicOutputCollector?collector)
????{
????????String?sentence?=?input.getString(0);
????????String[]?words?=?sentence.split("?");
????????for?(String?word?:?words)
????????{
????????????word?=?word.trim();
????????????if?(!word.isEmpty())
????????????{
????????????????word?=?word.toLowerCase();
????????????????collector.emit(new?Values(word));
????????????}
????????}
????}
第三步:關聯步驟
TopologyBuilder?builder?=?new?TopologyBuilder();
builder.setSpout("word-reader",new?WordReader());
builder.setBolt("word-normalizer",?new?WordNormalizer()).shuffleGrouping("word-reader");
Integer?number?=?2;
builder.setBolt("word-counter",?new?WordCounter(),?4).fieldsGrouping("word-normalizer",?new Fields("word"));
第四步:
最終實現的結果:
Field:Word
? ? ? ? ? ? the
? ? ? ? ? ? sporm
? ? ? ? ? ? is
? ? ? ? ? ? ...
例子2:
第一步:
public?void?declareOutputFields(OutputFieldsDeclarer?declarer)
{
? ? ? declarer.declare(new?Fields("word",?"count"));
}
第二步:
public?void?execute(Tuple?tuple,?BasicOutputCollector?collector)
?{
????????????String?word?=?tuple.getString(0);
????????????Integer?count?=?counts.get(word);
????????????if?(count?==?null)
????????????????count?=?0;
????????????count++;
????????????counts.put(word,?count);
????????????collector.emit(new?Values(word,?count));
}
第三步:
Fields("word",?"count")
? ? ? ? ? ? “is”,1
? ? ? ? ? ? “sporm”,3
? ? ? ? ? ? “the”,2
? ? ? ? ? ? ? .....
例子3:
D:\.....\Workspaces\MyEclipse?8.5\bigData\examples-ch06-real-life-app-master\src\main\java\storm\analytics\....
第一步:
TopologyBuilder?builder?=?new?TopologyBuilder();
builder.setSpout("read-feed",?new?UsersNavigationSpout(),?3);
builder.setBolt("get-categ",?new?GetCategoryBolt(),?3).shuffleGrouping("read-feed");
builder.setBolt("user-history",?new?UserHistoryBolt(),?5).fieldsGrouping("get-categ",?new?Fields("user"));
第二步:發送者輸出是三個結構體:Fields("user","product",?"categ")
GetCategoryBolt.java
public?void?execute(Tuple?input,?BasicOutputCollector?collector)
?{
????????NavigationEntry?entry?=?(NavigationEntry)input.getValue(1);
????????if("PRODUCT".equals(entry.getPageType())){
????????????try?{
????????????????String?product?=?(String)entry.getOtherData().get("product");
????????????????//?Call?the?items?API?to?get?item?information
????????????????Product?itm?=?reader.readItem(product);
????????????????if(itm?==null)
????????????????????return?;
????????????????String?categ?=?itm.getCategory();
????????????????collector.emit(new?Values(entry.getUserId(),?product,?categ));
????????????}?catch?(Exception?ex)?{
????????????????System.err.println("Error?processing?PRODUCT?tuple"+?ex);
????????????????ex.printStackTrace();
????????????}
????????}
????}
????@Override
????public?void?declareOutputFields(OutputFieldsDeclarer?declarer)?{
????????declarer.declare(new?Fields("user","product",?"categ"));
????}
第三步:new?Fields("user"))只取Fields("user","product",?"categ"))中的User
builder.setBolt("user-history",?new?UserHistoryBolt(),?5).fieldsGrouping("get-categ",?new?Fields("user"));
---------------------
作者:VessalasdXZ
來源:CSDN
原文:https://blog.csdn.net/vessalasd1/article/details/50472123
版權聲明:本文為博主原創文章,轉載請附上博文鏈接!
轉載于:https://www.cnblogs.com/wangjing666/p/10025458.html
總結
以上是生活随笔為你收集整理的Storm里面fieldsGrouping和Field的概念详解的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 冲刺7
- 下一篇: 项目Alpha冲刺 Day11