ack机制之代码实现,实现BaseRichBolt的方式,使用BaseBasicBolt的方式实现BaseRichBolt发ack和fail的功能
生活随笔
收集整理的這篇文章主要介紹了
ack机制之代码实现,实现BaseRichBolt的方式,使用BaseBasicBolt的方式实现BaseRichBolt发ack和fail的功能
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
代碼結構如下:
pom文件內容如下:
AckSpout如下:
package cn.toto.storm.ack;import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichSpout; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Values;import java.util.List; import java.util.Map; import java.util.UUID;/*** 代碼說明** @author tuzq* @create 2017-06-21 14:27*/ public class AckSpout extends BaseRichSpout {private SpoutOutputCollector collector;@Overridepublic void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {this.collector = collector;}//每次調用一次就發送一條消息@Overridepublic void nextTuple() {//生產一條數據String uuid = UUID.randomUUID().toString().replace("_","");collector.emit(new Values(uuid),new Values(uuid));try{Thread.sleep(5 * 10000);} catch(Exception e) {e.printStackTrace();}}//的定義發送的字段@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("uuid"));}@Overridepublic void ack(Object msgId) {System.out.println("xiaoxi:" + msgId);}@Overridepublic void fail(Object msgId) {System.out.println("xiaoxi" + msgId);collector.emit((List)msgId,msgId);} }Bolt1的代碼如下:
package cn.toto.storm.ack;/*** Created by toto on 2017/6/21.*/import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values;import java.util.Map;/*** 代碼說明** @author tuzq* @create 2017-06-21 14:56*/ public class Bolt1 extends BaseRichBolt {private OutputCollector collector;//初始化方法,只調用一次@Overridepublic void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {this.collector = collector;}//被循環調用@Overridepublic void execute(Tuple input) {collector.emit(input,new Values(input.getString(0)));System.out.println("bolt1的execute方法被調用一次" + input.getString(0));collector.ack(input);}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("uuid"));} }Bolt2的代碼如下:
package cn.toto.storm.ack;/*** Created by toto on 2017/6/21.*/import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values;import java.util.Map;/*** 代碼說明** @author tuzq* @create 2017-06-21 15:01*/ public class Bolt2 extends BaseRichBolt {private OutputCollector collector;//初始化方法,只調動一次@Overridepublic void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {this.collector = collector;}//被循環調用@Overridepublic void execute(Tuple input) {collector.emit(input,new Values(input.getString(0)));System.out.println("bolt2的execute方法被調用一次" + input.getString(0));collector.ack(input);}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("uuid"));} }Bolt3的代碼如下:
package cn.toto.storm.ack;/*** Created by toto on 2017/6/21.*/import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values;import java.util.Map;/*** 代碼說明** @author tuzq* @create 2017-06-21 15:04*/ public class Bolt3 extends BaseRichBolt {private OutputCollector collector;//初始化方法,只調用一次@Overridepublic void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {this.collector = collector;}//被循環調用@Overridepublic void execute(Tuple input) {collector.emit(input,new Values(input.getString(0)));System.out.println("bolt3的execute方法被調用一次" + input.getString(0));collector.fail(input);}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("uuid"));} }Bolt4的代碼如下:
package cn.toto.storm.ack;/*** Created by toto on 2017/6/21.*/import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values;import java.util.Map;/*** 代碼說明** @author tuzq* @create 2017-06-21 15:06*/ public class Bolt4 extends BaseRichBolt {private OutputCollector collector;//初始化方法,只調用一次@Overridepublic void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {this.collector = collector;}//被循環調用@Overridepublic void execute(Tuple input) {collector.emit(input,new Values(input.getString(0)));System.out.println("bolt4的execute方法調用一次" + input.getString(0));collector.ack(input);}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("uuid"));} }右鍵運行項目
案例2
AckSpout代碼如下:
Bolt1的配置如下:
package cn.toto.storm.basebasicbolt;/*** Created by toto on 2017/6/21.*/import org.apache.storm.topology.BasicOutputCollector; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseBasicBolt; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values;/*** 代碼說明** @author tuzq* @create 20`這里寫代碼片`17-06-21 15:32*/ public class Bolt1 extends BaseBasicBolt {@Overridepublic void execute(Tuple input, BasicOutputCollector collector) {collector.emit(new Values(input.getString(0)));}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("uuid"));} }Bolt2的配置如下:
package cn.toto.storm.basebasicbolt;/*** Created by toto on 2017/6/21.*/import org.apache.storm.topology.BasicOutputCollector; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseBasicBolt; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values;/*** 代碼說明** @author tuzq* @create 2017-06-21 15:33*/ public class Bolt2 extends BaseBasicBolt {@Overridepublic void execute(Tuple input, BasicOutputCollector collector) {collector.emit(new Values(input.getString(0)));}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("uuid"));} }Bolt3的代碼配置如下:
package cn.toto.storm.basebasicbolt;/*** Created by toto on 2017/6/21.*/import org.apache.storm.topology.BasicOutputCollector; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseBasicBolt; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values;/*** 代碼說明** @author tuzq* @create 2017-06-21 15:34*/ public class Bolt3 extends BaseBasicBolt {@Overridepublic void execute(Tuple input, BasicOutputCollector collector) {collector.emit(new Values(input.getString(0)));}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("uuid"));} }Bolt4的代碼配置如下:
package cn.toto.storm.basebasicbolt;/*** Created by toto on 2017/6/21.*/import org.apache.storm.topology.BasicOutputCollector; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseBasicBolt; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values;/*** 代碼說明** @author tuzq* @create 2017-06-21 15:35*/ public class Bolt4 extends BaseBasicBolt {@Overridepublic void execute(Tuple input, BasicOutputCollector collector) {collector.emit(new Values(input.getString(0)));}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("uuid"));} }AckTopologyDriver的代碼如下:
package cn.toto.storm.basebasicbolt;/*** Created by toto on 2017/6/21.*/import cn.toto.storm.ack.AckSpout; import cn.toto.storm.ack.Bolt1; import cn.toto.storm.ack.Bolt3; import cn.toto.storm.ack.Bolt4; import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.generated.StormTopology; import org.apache.storm.topology.TopologyBuilder;/*** 代碼說明** @author tuzq* @create 2017-06-21 15:37*/ public class AckTopologyDriver {public static void main(String[] args) {//1、準備任務信息TopologyBuilder topologyBuilder = new TopologyBuilder();topologyBuilder.setSpout("mySpout", new AckSpout(), 1);topologyBuilder.setBolt("bolt1",new Bolt1(),1).shuffleGrouping("mySpout");topologyBuilder.setBolt("bolt2",new Bolt2(),1).shuffleGrouping("bolt1");topologyBuilder.setBolt("bolt3",new Bolt3(),1).shuffleGrouping("bolt2");topologyBuilder.setBolt("bolt4",new Bolt4(),1).shuffleGrouping("bolt3");Config config = new Config();config.setNumWorkers(2);StormTopology stormTopology = topologyBuilder.createTopology();LocalCluster localCluster = new LocalCluster();localCluster.submitTopology("wordcount",config,stormTopology);} }當失敗了的時候,拋出:throw FailedException ,然后可以實現類似fail()方法。
總結
以上是生活随笔為你收集整理的ack机制之代码实现,实现BaseRichBolt的方式,使用BaseBasicBolt的方式实现BaseRichBolt发ack和fail的功能的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: ack是什么,如何使用Ack机制,如何关
- 下一篇: 一键借多久能到账