在storm中使用流
生活随笔
收集整理的這篇文章主要介紹了
在storm中使用流
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
storm是一個強大的流式計算框架,單流的storm在使用中非常普遍,而同時storm也提供對多個流的支持;通過定義多個流,用戶可以進一步的把數據發放到不同的流中進行處理。
代碼如下:
一、 定義多個流的spout
public class MultiStreamRandomWordSpout extends BaseRichSpout {private static final long serialVersionUID = 1L;private SpoutOutputCollector collector;// 模擬一些數據String[] words = { "iphone", "xiaomi", "mate", "sony", "sumsung", "moto","meizu" };@Overridepublic void nextTuple() {Random random = new Random();int index = random.nextInt(words.length);// 通過隨機數拿到一個商品名String godName = words[index];// 分別給s1和s2著兩個流中發送一個godName的數據collector.emit("s1", new Values(godName));collector.emit("s2", new Values(godName));// 每發送一個消息,休眠500msUtils.sleep(50000);}@Overridepublic void open(@SuppressWarnings("rawtypes") Map conf,TopologyContext context, SpoutOutputCollector collector) {this.collector = collector;}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {/* 分別給兩個流中聲明一個origianl */declarer.declareStream("s1", new Fields("original"));declarer.declareStream("s2", new Fields("original"));}}二、相關的處理bolt
public class UpperBolt extends BaseBasicBolt{//業務處理邏輯@Overridepublic void execute(Tuple tuple, BasicOutputCollector collector) {//先獲取到上一個組件傳遞過來的數據,數據在tuple里面String godName = tuple.getString(0);//將商品名轉換成大寫String godName_upper = godName.toUpperCase();//將轉換完成的商品名發送出去collector.emit(new Values(godName_upper));}//聲明該bolt組件要發出去的tuple的字段@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("uppername"));}}public class SuffixBolt extends BaseBasicBolt {FileWriter fileWriter = null;// 在bolt組件運行過程中只會被調用一次@Overridepublic void prepare(Map stormConf, TopologyContext context) {// try { // fileWriter = new FileWriter("/home/hadoop/stormoutput/" // + UUID.randomUUID()); // } catch (IOException e) { // throw new RuntimeException(e); // }}// 該bolt組件的核心處理邏輯// 每收到一個tuple消息,就會被調用一次@Overridepublic void execute(Tuple tuple, BasicOutputCollector collector) {// 先拿到上一個組件發送過來的商品名稱String upper_name = tuple.getString(0);String suffix_name = upper_name + "_itisok";// 為上一個組件發送過來的商品名稱添加后綴System.err.println(suffix_name);// try {// fileWriter.write(suffix_name);// fileWriter.write("\n");// fileWriter.flush();//// } catch (IOException e) {// throw new RuntimeException(e);// }}// 本bolt已經不需要發送tuple消息到下一個組件,所以不需要再聲明tuple的字段@Overridepublic void declareOutputFields(OutputFieldsDeclarer arg0) {}}
三、topo public class MultiStreamTopo {public static void main(String[] args) throws AlreadyAliveException,InvalidTopologyException {TopologyBuilder builder = new TopologyBuilder();builder.setSpout("multiSpout", new MultiStreamRandomWordSpout(), 1);builder.setBolt("bolt1", new UpperBolt(), 1).shuffleGrouping("multiSpout", "s1");//讓bolt1來隨機分組的方式消費multiSpout發送的s1流中的數據builder.setBolt("bolt2", new UpperBolt(), 1).shuffleGrouping("multiSpout", "s2");讓bolt2來隨機分組的方式消費multiSpout發送的s2流中的數據builder.setBolt("bolt3", new SuffixBolt(), 1).shuffleGrouping("bolt1").shuffleGrouping("bolt2");//bolt3同時隨機分組的方式消費bolt1和bolt2的默認流中的數據Config config = new Config();config.setDebug(false);config.setNumAckers(0);if (args.length > 0) {StormSubmitter.submitTopology(args[0], config,builder.createTopology());} else {LocalCluster cluster = new LocalCluster();cluster.submitTopology("test", config, builder.createTopology());}} }
創作挑戰賽新人創作獎勵來咯,堅持創作打卡瓜分現金大獎
總結
以上是生活随笔為你收集整理的在storm中使用流的全部內容,希望文章能夠幫你解決所遇到的問題。