Strom小实例,大小写转换
生活随笔
收集整理的這篇文章主要介紹了
Strom小实例,大小写转换
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
結構:
RandomWordSpout:
package cn.itcast.stormdemo; import java.util.Map; import java.util.Random; import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichSpout; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; import backtype.storm.utils.Utils; public class RandomWordSpout extends BaseRichSpout{ private SpoutOutputCollector collector; //模擬一些數據 String[] words = {"iphone","xiaomi","mate","sony","sumsung","moto","meizu"}; //不斷地往下一個組件發送tuple消息 //這里面是該spout組件的核心邏輯 @Override public void nextTuple() { //可以從kafka消息隊列中拿到數據,簡便起見,我們從words數組中隨機挑選一個商品名發送出去 Random random = new Random(); int index = random.nextInt(words.length); //通過隨機數拿到一個商品名 String godName = words[index]; //將商品名封裝成tuple,發送消息給下一個組件 collector.emit(new Values(godName)); //每發送一個消息,休眠500ms Utils.sleep(500); } //初始化方法,在spout組件實例化時調用一次 @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this.collector = collector; } //聲明本spout組件發送出去的tuple中的數據的字段名 @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("orignname")); } } SuffixBolt?:
package cn.itcast.stormdemo; import java.io.FileWriter; import java.io.IOException; import java.util.Map; import java.util.UUID; import backtype.storm.task.TopologyContext; import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseBasicBolt; import backtype.storm.tuple.Tuple; public class SuffixBolt extends BaseBasicBolt{ FileWriter fileWriter = null; //在bolt組件運行過程中只會被調用一次 @Override public void prepare(Map stormConf, TopologyContext context) { try { fileWriter = new FileWriter("/home/hadoop/stormoutput/"+UUID.randomUUID()); } catch (IOException e) { throw new RuntimeException(e); } } //該bolt組件的核心處理邏輯 //每收到一個tuple消息,就會被調用一次 @Override public void execute(Tuple tuple, BasicOutputCollector collector) { //先拿到上一個組件發送過來的商品名稱 String upper_name = tuple.getString(0); String suffix_name = upper_name + "_itisok"; //為上一個組件發送過來的商品名稱添加后綴 try { fileWriter.write(suffix_name); fileWriter.write("\n"); fileWriter.flush(); } catch (IOException e) { throw new RuntimeException(e); } } //本bolt已經不需要發送tuple消息到下一個組件,所以不需要再聲明tuple的字段 @Override public void declareOutputFields(OutputFieldsDeclarer arg0) { } } TopoMain?:
package cn.itcast.stormdemo; import backtype.storm.Config; import backtype.storm.StormSubmitter; import backtype.storm.generated.AlreadyAliveException; import backtype.storm.generated.InvalidTopologyException; import backtype.storm.generated.StormTopology; import backtype.storm.topology.TopologyBuilder; /** * 組織各個處理組件形成一個完整的處理流程,就是所謂的topology(類似于mapreduce程序中的job) * 并且將該topology提交給storm集群去運行,topology提交到集群后就將永無休止地運行,除非人為或者異常退出 * @author duanhaitao@itcast.cn * */ public class TopoMain { public static void main(String[] args) throws Exception { TopologyBuilder builder = new TopologyBuilder(); //將我們的spout組件設置到topology中去 //parallelism_hint :4 表示用4個excutor來執行這個組件 //setNumTasks(8) 設置的是該組件執行時的并發task數量,也就意味著1個excutor會運行2個task builder.setSpout("randomspout", new RandomWordSpout(), 4).setNumTasks(8); //將大寫轉換bolt組件設置到topology,并且指定它接收randomspout組件的消息 //.shuffleGrouping("randomspout")包含兩層含義: //1、upperbolt組件接收的tuple消息一定來自于randomspout組件 //2、randomspout組件和upperbolt組件的大量并發task實例之間收發消息時采用的分組策略是隨機分組shuffleGrouping builder.setBolt("upperbolt", new UpperBolt(), 4).shuffleGrouping("randomspout"); //將添加后綴的bolt組件設置到topology,并且指定它接收upperbolt組件的消息 builder.setBolt("suffixbolt", new SuffixBolt(), 4).shuffleGrouping("upperbolt"); //用builder來創建一個topology StormTopology demotop = builder.createTopology(); //配置一些topology在集群中運行時的參數 Config conf = new Config(); //這里設置的是整個demotop所占用的槽位數,也就是worker的數量 conf.setNumWorkers(4); conf.setDebug(true); conf.setNumAckers(0); //將這個topology提交給storm集群運行 StormSubmitter.submitTopology("demotopo", conf, demotop); } } UpperBolt?:package cn.itcast.stormdemo; import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseBasicBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; public class UpperBolt extends BaseBasicBolt{ //業務處理邏輯 @Override public void execute(Tuple tuple, BasicOutputCollector collector) { //先獲取到上一個組件傳遞過來的數據,數據在tuple里面 String godName = tuple.getString(0); //將商品名轉換成大寫 String godName_upper = godName.toUpperCase(); //將轉換完成的商品名發送出去 collector.emit(new Values(godName_upper)); } //聲明該bolt組件要發出去的tuple的字段 @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("uppername")); } }
來自為知筆記(Wiz)
RandomWordSpout:
來自為知筆記(Wiz)
轉載于:https://www.cnblogs.com/xiaoxiao5ya/p/caf5daa5f8b5d3861583b24e1f2d8308.html
總結
以上是生活随笔為你收集整理的Strom小实例,大小写转换的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 带数据库的智能合约
- 下一篇: POJ 3352 Road Constr