在线实时大数据平台Storm开发之wordcount
生活随笔
收集整理的這篇文章主要介紹了
在线实时大数据平台Storm开发之wordcount
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
可以在Eclipse下通過Maven引入storm-starter項目,這里直接將storm目錄下lib中的jar包引入到工程中。
由于storm-core-1.0.1.jar中帶有default.yaml,如果打包時帶上會有提示重復配置文件的錯誤,所以打包時用export->JAR file而不是export->Runnable JAR file,不帶第三方jar包。
開發一個wordcount案例,具體見代碼。通過代碼理解storm計算框架的流程。代碼如下:
1、topology類
package cn.wc;import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.topology.TopologyBuilder; import org.apache.storm.tuple.Fields;public class TopologyMain {public static void main(String[] args) throws InterruptedException {//Topology definitionTopologyBuilder builder = new TopologyBuilder();builder.setSpout("word-reader",new WordReader());builder.setBolt("word-normalizer",new WordNormalizer()).shuffleGrouping("word-reader");builder.setBolt("word-counter", new WordCounter(),1).fieldsGrouping("word-normalizer", new Fields("word"));//ConfigurationConfig conf = new Config();conf.put("wordsFile", args[0]);conf.setDebug(false);//Topology runconf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);LocalCluster cluster = new LocalCluster();cluster.submitTopology("Getting-Started-Toplogie", conf, builder.createTopology());Thread.sleep(20000);cluster.shutdown();} } //storm jar /mnt/wc.jar cn.wc.TopologyMain /mnt/words.txt 2、spout類 package cn.wc;import java.io.BufferedReader; import java.io.FileNotFoundException; import java.io.FileReader; import java.util.Map;import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichSpout; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Values;//Spout作為數據源,它實現了IRichSpout接口,功能是讀取一個文本文件并把它的每一行內容發送給bolt。 public class WordReader extends BaseRichSpout {private SpoutOutputCollector collector;private FileReader fileReader;private boolean completed = false;public void ack(Object msgId) {System.out.println("OK:"+msgId);}public void close() {}public void fail(Object msgId) {System.out.println("FAIL:"+msgId);}/*** The only thing that the methods will do It is emit each * file line* spout最主要的方法,讀取文本文件,并把它的每一行發射出去(給bolt) * 這個方法會不斷被調用,為了降低它對CPU的消耗,當任務完成時讓它sleep一下 */public void nextTuple() {/*** The nextuple it is called forever, so if we have been readed the file* we will wait and then return*/if(completed){try {Thread.sleep(1000);} catch (InterruptedException e) {//Do nothing}return;}String str;//Open the readerBufferedReader reader = new BufferedReader(fileReader);try{//Read all lineswhile((str = reader.readLine()) != null){/*** By each line emmit a new value with the line as a their* 發射每一行,Values是一個ArrayList的實現 */this.collector.emit(new Values(str),str);}}catch(Exception e){throw new RuntimeException("Error reading tuple",e);}finally{completed = true;}}/*** We will create the file and get the collector object* 三個參數,第一個是創建Topology時的配置,第二個是所有的Topology數據,第三個是用來把Spout的數據發射給bolt * */public void open(Map conf, TopologyContext context,SpoutOutputCollector collector) {try {//獲取創建Topology時指定的要讀取的文件路徑 this.fileReader = new FileReader(conf.get("wordsFile").toString());} catch (FileNotFoundException e) {throw new RuntimeException("Error reading file ["+conf.get("wordFile")+"]");}//初始化發射器this.collector = collector;}/*** Declare the output field "word"*/public void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("line"));} } 3、bolt 類package cn.wc;import org.apache.storm.topology.BasicOutputCollector; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseBasicBolt; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values;//Spout已經成功讀取文件并把每一行作為一個tuple(在Storm數據以tuple的形式傳遞)發射過來,我們這里需要創建兩個bolt分別來負責解析每一行和對單詞計數。 //Bolt中最重要的是execute方法,每當一個tuple傳過來時它便會被調用。 public class WordNormalizer extends BaseBasicBolt {public void cleanup() {}/*** The bolt will receive the line from the* words file and process it to Normalize this line* * The normalize will be put the words in lower case* and split the line to get all words in this * bolt中最重要的方法,每當接收到一個tuple時,此方法便被調用 * 這個方法的作用就是把文本文件中的每一行切分成一個個單詞,并把這些單詞發射出去(給下一個bolt處理) */public void execute(Tuple input, BasicOutputCollector collector) {String sentence = input.getString(0);String[] words = sentence.split(" ");for(String word : words){word = word.trim();if(!word.isEmpty()){word = word.toLowerCase();collector.emit(new Values(word));}}}/*** The bolt will only emit the field "word" */public void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("word"));} }
package cn.wc;import java.util.HashMap; import java.util.Map;import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.BasicOutputCollector; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseBasicBolt; import org.apache.storm.tuple.Tuple;public class WordCounter extends BaseBasicBolt {Integer id;String name;Map<String, Integer> counters;/*** At the end of the spout (when the cluster is shutdown* We will show the word counters* Topology執行完畢的清理工作,比如關閉連接、釋放資源等操作都會寫在這里 */@Overridepublic void cleanup() {System.out.println("-- Word Counter ["+name+"-"+id+"] --");for(Map.Entry<String, Integer> entry : counters.entrySet()){System.out.println(entry.getKey()+": "+entry.getValue());}}/*** On create */@Overridepublic void prepare(Map stormConf, TopologyContext context) {this.counters = new HashMap<String, Integer>();this.name = context.getThisComponentId();this.id = context.getThisTaskId();}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {}@Overridepublic void execute(Tuple input, BasicOutputCollector collector) {String str = input.getString(0);/*** If the word dosn't exist in the map we will create* this, if not We will add 1 */if(!counters.containsKey(str)){counters.put(str, 1);}else{Integer c = counters.get(str) + 1;counters.put(str, c);}} }
總結
以上是生活随笔為你收集整理的在线实时大数据平台Storm开发之wordcount的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 在线实时大数据平台Storm集群组件学习
- 下一篇: 算法导论之贪心算法(Huffman编码和