storm详解
1.? 構建拓撲代碼
2.一級過濾bolt
package?demo;import?java.util.Map;import?backtype.storm.task.TopologyContext; import?backtype.storm.topology.BasicOutputCollector; import?backtype.storm.topology.IBasicBolt; import?backtype.storm.topology.OutputFieldsDeclarer; import?backtype.storm.tuple.Fields; import?backtype.storm.tuple.Tuple; import?backtype.storm.tuple.Values; //一級的過濾bolt public?class?AreaFilterBolt?implements?IBasicBolt?{@Overridepublic?void?declareOutputFields(OutputFieldsDeclarer?declarer)?{//?TODO?Auto-generated?method?stubdeclarer.declare(new?Fields("area_id","order_amt","create_time"));//tuple里面每個value的對應name}@Overridepublic?Map<String,?Object>?getComponentConfiguration()?{//?TODO?Auto-generated?method?stubreturn?null;}@Overridepublic?void?cleanup()?{//?TODO?Auto-generated?method?stub}@Overridepublic?void?execute(Tuple?input,?BasicOutputCollector?collector)?{//order_id,order_amt,create_time,area_idString?order=input.getString(0);//取出集合values中的第一個valueif(order!=null){String?orderArr[]=order.split("\\t");collector.emit(new?Values(orderArr[3],orderArr[1],DateFmt.getCountDate(orderArr[2],?DateFmt.date_short)));//area_id,order_amt,create_time}}@Overridepublic?void?prepare(Map?arg0,?TopologyContext?arg1)?{//?TODO?Auto-generated?method?stub}}3.局部匯總bolt(按日期和區域和匯總)
package?demo;import?java.util.HashMap; import?java.util.Map;import?backtype.storm.task.TopologyContext; import?backtype.storm.topology.BasicOutputCollector; import?backtype.storm.topology.IBasicBolt; import?backtype.storm.topology.OutputFieldsDeclarer; import?backtype.storm.tuple.Fields; import?backtype.storm.tuple.Tuple; import?backtype.storm.tuple.Values;//局部匯總 public?class?AreaAmtBolt?implements?IBasicBolt?{Map<String,Double>?countsMap=null;@Overridepublic?void?declareOutputFields(OutputFieldsDeclarer?declarer)?{declarer.declare(new?Fields("date_area","amt"));}@Overridepublic?Map<String,?Object>?getComponentConfiguration()?{//?TODO?Auto-generated?method?stubreturn?null;}@Overridepublic?void?prepare(Map?paramMap,?TopologyContext?paramTopologyContext)?{//?TODO?Auto-generated?method?stubcountsMap?=new?HashMap<String,?Double>();}@Overridepublic?void?execute(Tuple?input,BasicOutputCollector?collector)?{if(input!=null)//如果spout端沒數據就會發空值,所以要做判斷再往下發{String?area_id=input.getString(0);Double?order_amt=input.getDouble(1);String??order_date=input.getStringByField("order_date");Double?count=countsMap.get(area_id+"_"+order_date);if?(count==null){count?=?0.0;????}count+=order_amt;countsMap.put(area_id+"_"+order_date,count);System.err.println("areaAmtBolt"+order_date+"_"+area_id+"="+count);collector.emit(new?Values(area_id+"_"+order_date,count));}}@Overridepublic?void?cleanup()?{countsMap.clear();}}4. 最終結果寫入Hbase
package?demo;import?java.util.HashMap; import?java.util.HashSet; import?java.util.Map; import?java.util.Set;import?backtype.storm.task.TopologyContext; import?backtype.storm.topology.BasicOutputCollector; import?backtype.storm.topology.IBasicBolt; import?backtype.storm.topology.OutputFieldsDeclarer; import?backtype.storm.tuple.Tuple;//結果定時寫入hbase的bolt public?class?AreaRsltBolt?implements?IBasicBolt?{Map<String,Double>?countsMap=null;long?beginTime=System.currentTimeMillis();long?endTime=0L;HBaseDao?dao=null;@Overridepublic?void?declareOutputFields(OutputFieldsDeclarer?paramOutputFieldsDeclarer)?{//?TODO?Auto-generated?method?stub}@Overridepublic?Map<String,?Object>?getComponentConfiguration()?{//?TODO?Auto-generated?method?stubreturn?null;}@Overridepublic?void?prepare(Map?paramMap,?TopologyContext?paramTopologyContext)?{countsMap?=new?HashMap<String,?Double>();dao=new?HBaseDAOImp();}@Overridepublic?void?execute(Tuple?input,BasicOutputCollector?paramBasicOutputCollector)?{String?date_areaid=input.getString(0);double??order_amt=input.getDouble(1);?countsMap.put(date_areaid,order_amt);endTime=System.currentTimeMillis();if?(endTime-beginTime>=5*1000){for(String?key:countsMap.keySet()){//put?into?hbase//2014-05-05_1,amtdao.insert("area_order","cf","order_amt",countsMap.get(key));System.err.println("rsltBolt?put?hbase:?key="+key+";?order_amt="+countsMap.get(key));}beginTime=System.currentTimeMillis();}}@Overridepublic?void?cleanup()?{//?TODO?Auto-generated?method?stub}}5. DateFmt代碼
package?demo;import?java.text.ParseException; import?java.text.SimpleDateFormat; import?java.util.Calendar; import?java.util.Date;public?class?DateFmt?{public?static?final?String?date_long="yyyy-MM-dd?HH:mm:ss";public?static?final?String?date_short="yyyy-MM-dd";public?static?SimpleDateFormat?sdf=new?SimpleDateFormat(date_short);public?static?String?getCountDate(String?date,String?patton){SimpleDateFormat?sdf=new?SimpleDateFormat(patton);Calendar?cal?=Calendar.getInstance();if?(date!=null){try?{cal.setTime(sdf.parse(date));}?catch?(ParseException?e)?{e.printStackTrace();}}return?sdf.format(cal.getTime());}public?static?Date?parseDate(String?dateStr)?throws?Exception{return?sdf.parse(dateStr);}public?static?void?main(String[]?args)?{System.out.println(DateFmt.getCountDate("2015-09-08?09:09:08?",?DateFmt.date_long));} }本文出自 “點滴積累” 博客,請務必保留此出處http://tianxingzhe.blog.51cto.com/3390077/1701319
總結
- 上一篇: 我所认识的JavaScript正则表达式
- 下一篇: 邮箱自动补全