storm-hbase jar包中的bolt节点源码解析
? ? ? 一段時間內,大家都是自己在storm的節點中實現對hbase的操作,不管是普通的topo還是在trident中都是這樣;不知道從那個版本起,在storm的壓縮包中就多出了好幾個jar包,把針對habse,mysql,mongodb等等的數據庫的操作都寫好了框架,不需要我們在去自己實現了;這里就先解析一下strom-habse這個jar包在普通的topo中是怎么實現的:
? ??
package org.apache.storm.hbase.bolt;import org.apache.storm.Config; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.base.BaseRichBolt; import org.apache.commons.lang.Validate; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.storm.hbase.bolt.mapper.HBaseMapper; import org.apache.storm.hbase.common.HBaseClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory;import java.io.IOException; import java.util.HashMap; import java.util.Map;/** * @ClassName: AbstractHBaseBolt * @Description: 抽象的HbaseBolt,在該bolt中不提供任何的功能,只是對一些公用的初始值進行了賦值;具體的功能由他的子類負責,一個子類負責保存,一個子類負責查詢 */ public abstract class AbstractHBaseBolt extends BaseRichBolt {//這里繼承了storm的BaseRichBoltprivate static final Logger LOG = LoggerFactory.getLogger(AbstractHBaseBolt.class);protected OutputCollector collector;//hbaseClient 不參與序列化,這是storm-hbase自己實現的HbaseClient,用來連接和訪問hbase數據庫;在文中后面的地方會說protected transient HBaseClient hBaseClient;//這個bolt所操作的hbase中的表名protected String tableName;//這是storm-hbase中定一個一個接口,作用就是把一個接收到的tuple轉換為hbase對應的rowkey和columnprotected HBaseMapper mapper;protected String configKey;//會在子類中進行賦值,表示對hbase的配置信息在strom的map中的key值/*使用strom-hbase的時候,在初始化topology的時候,在storm的conf中把habse的相關配置設置到一個map中,然后通過這個key在bolt中取到*/public AbstractHBaseBolt(String tableName, HBaseMapper mapper) {Validate.notEmpty(tableName, "Table name can not be blank or null");Validate.notNull(mapper, "mapper can not be null");this.tableName = tableName;this.mapper = mapper;}@Overridepublic void prepare(Map map, TopologyContext topologyContext, OutputCollector collector) {this.collector = collector;final Configuration hbConfig = HBaseConfiguration.create();Map<String, Object> conf = (Map<String, Object>)map.get(this.configKey);if(conf == null) {//在使用storm-hbase的時候,需要在storm的conf中初始化一個map,用來存放hbase對應的配置信息throw new IllegalArgumentException("HBase configuration not found using key '" + this.configKey + "'");}if(conf.get("hbase.rootdir") == null) {LOG.warn("No 'hbase.rootdir' value found in configuration! Using HBase defaults.");}for(String key : conf.keySet()) {//設置相關的hbase的配置信息hbConfig.set(key, String.valueOf(conf.get(key)));}Map<String, Object> hbaseConfMap = new HashMap<String, Object>(conf);//conf是一個持久化的map,所以這里復制一份給hbaseClient使用//為了能夠向后兼容,需要把TOPOLOGY_AUTO_CREDENTIALS賦值到hbase的配置信息中(有了這個參數nimbus給每個worker自己的憑證信息,然后worker這個憑證信息去訪問habse;在開啟kebers認證的時候用的吧)hbaseConfMap.put(Config.TOPOLOGY_AUTO_CREDENTIALS, map.get(Config.TOPOLOGY_AUTO_CREDENTIALS));this.hBaseClient = new HBaseClient(hbaseConfMap, hbConfig, tableName);//創建hbaseClient}@Overridepublic void cleanup() {//正確關閉hbaseClienttry {hBaseClient.close();} catch (IOException e) {LOG.error("HBase Client Close Failed ", e);}} }上面的代碼是一個抽象的hbase的bolt,和我們寫bolt的時候一樣,繼承了BaseRichBolt(雖然我偶爾集成basebasicBolt);在代碼中的prepare方法中主要做了下面的事情
1.初始化了collector,因為只有在運行到prepare方法的時候,才能夠獲得到collector的實例;
2.初始化了hBaseClient,在這里初始化hBaseCilent是因為他是transient修飾的,不會被序列化;所以在nimbus下發任務的時候該對象是null,只能在prepare中實例化,這么做的原因是hBaseClient在網絡序列化傳輸的過程中存在很多無法反序列化的情況;(具體一點,就舉一個例子,在hbaseClient中會建立客戶端和集群之間的代理對象,進行rpc通信;所以nimbus不可能把他建立的rpc通信通過網絡傳輸賦值給某個worker;因為就算worker拿到了這個client對象,也并沒有建立rpc通信)
在構造方法中則賦值了可以在網絡之間傳輸的tablename和mapper;
在cleanup方法中,主要負責安全地關閉hbaseClient;
下面來看他的兩個子類,他們分別用來寫入hbase和查詢habse;主要邏輯就是實現execute方法了。
(1)向hbase寫入數據的bolt,源碼與注釋如下:
? ?(2)從hbase中讀取信息
package org.apache.storm.hbase.bolt;import org.apache.commons.lang.Validate; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Result; import org.apache.storm.hbase.bolt.mapper.HBaseMapper; import org.apache.storm.hbase.bolt.mapper.HBaseProjectionCriteria; import org.apache.storm.hbase.bolt.mapper.HBaseValueMapper; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; import org.apache.storm.utils.TupleUtils;import com.google.common.collect.Lists;/*** Basic bolt for querying from HBase.** Note: Each HBaseBolt defined in a topology is tied to a specific table.**/ public class HBaseLookupBolt extends AbstractHBaseBolt {private static final long serialVersionUID = 8253062226922790455L;/*這個是用來將從habse查詢到的result轉化為tuple以及聲明輸出的field字段的,需要自己實現;后面介紹*/private HBaseValueMapper rowToTupleMapper;/*這個主要用于輔助get查詢,里面分裝了要查詢的columFamily 和 columFamily:qualifier 兩數據,以便于在get的時候可以只指定列族,也可以指定特定的列;后面會介紹*/private HBaseProjectionCriteria projectionCriteria;public HBaseLookupBolt(String tableName, HBaseMapper mapper, HBaseValueMapper rowToTupleMapper){super(tableName, mapper);Validate.notNull(rowToTupleMapper, "rowToTupleMapper can not be null");this.rowToTupleMapper = rowToTupleMapper;}public HBaseLookupBolt withConfigKey(String configKey){this.configKey = configKey;return this;}public HBaseLookupBolt withProjectionCriteria(HBaseProjectionCriteria projectionCriteria) {this.projectionCriteria = projectionCriteria;return this;}@Overridepublic void execute(Tuple tuple) {if (TupleUtils.isTick(tuple)) {//如果是心跳信息的tuple,那么直接忽略不處理collector.ack(tuple);//直接確認這個tick tuplereturn;}byte[] rowKey = this.mapper.rowKey(tuple);Get get = hBaseClient.constructGetRequests(rowKey, projectionCriteria);//構建查詢try {/*其實操作很簡單 就是 table.get(get)操作;他這里看著有些不對勁的地方就是調用批量查詢方法來查詢單個的get;這是為了匹配他們自己封裝的接口,也就是沒有多封裝一下*/Result result = hBaseClient.batchGet(Lists.newArrayList(get))[0];for(Values values : rowToTupleMapper.toValues(tuple, result)) {//把result轉換為tuple,并逐一發送;this.collector.emit(tuple, values);}this.collector.ack(tuple);//發送確認消息} catch (Exception e) {//出錯處理this.collector.reportError(e);this.collector.fail(tuple);}}@Overridepublic void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {rowToTupleMapper.declareOutputFields(outputFieldsDeclarer);//聲明輸出field字段} }
?接著是hbaseClient的代碼注釋
package org.apache.storm.hbase.common;import com.google.common.collect.Lists; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.client.*; import org.apache.hadoop.hbase.security.UserProvider; import org.apache.storm.hbase.bolt.mapper.HBaseProjectionCriteria; import org.apache.storm.hbase.security.HBaseSecurityUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory;import java.io.Closeable; import java.io.IOException; import java.security.PrivilegedExceptionAction; import java.util.List; import java.util.Map;public class HBaseClient implements Closeable{private static final Logger LOG = LoggerFactory.getLogger(HBaseClient.class);private HTable table;//Hbase中的table對象public HBaseClient(Map<String, Object> map , final Configuration configuration, final String tableName) {try {UserProvider provider = HBaseSecurityUtil.login(map, configuration);//這里使用了Hbase的安全登錄方式來進行登錄this.table = provider.getCurrent().getUGI().doAs(new PrivilegedExceptionAction<HTable>() {@Overridepublic HTable run() throws IOException {return new HTable(configuration, tableName);//創建hbase中的table}});} catch(Exception e) {throw new RuntimeException("HBase bolt preparation failed: " + e.getMessage(), e);}}public List<Mutation> constructMutationReq(byte[] rowKey, ColumnList cols, Durability durability) {//創建一個mutation的LIST用來保存/更新List<Mutation> mutations = Lists.newArrayList();if (cols.hasColumns()) {//如果有標準的column要保存Put put = new Put(rowKey);//創建put對象put.setDurability(durability);//設置持久行for (ColumnList.Column col : cols.getColumns()) {if (col.getTs() > 0) {//有時間戳的加時間戳put.add(col.getFamily(),col.getQualifier(),col.getTs(),col.getValue());} else {//沒有時間戳的不加時間戳put.add(col.getFamily(),col.getQualifier(),col.getValue());}}mutations.add(put);//把put緩存起來}if (cols.hasCounters()) {//如果有計數列要保存Increment inc = new Increment(rowKey);//創建一個increment對象inc.setDurability(durability);//設置持久性for (ColumnList.Counter cnt : cols.getCounters()) {inc.addColumn(cnt.getFamily(),cnt.getQualifier(),cnt.getIncrement());}mutations.add(inc);}if (mutations.isEmpty()) {//如果即沒有計數也沒有標準的column,那就添加一個空的---也就是只保存rowkeymutations.add(new Put(rowKey));}return mutations;}public void batchMutate(List<Mutation> mutations) throws Exception {//批量提交mutation列表中的插入和更新操作Object[] result = new Object[mutations.size()];try {table.batch(mutations, result);} catch (InterruptedException e) {LOG.warn("Error performing a mutation to HBase.", e);throw e;} catch (IOException e) {LOG.warn("Error performing a mutation to HBase.", e);throw e;}}public Get constructGetRequests(byte[] rowKey, HBaseProjectionCriteria projectionCriteria) {//創建查詢操作Get get = new Get(rowKey);//創建一個habse的get對象if (projectionCriteria != null) {for (byte[] columnFamily : projectionCriteria.getColumnFamilies()) {//獲取要查詢的列族get.addFamily(columnFamily);}for (HBaseProjectionCriteria.ColumnMetaData columnMetaData : projectionCriteria.getColumns()) {//獲取要查詢的列get.addColumn(columnMetaData.getColumnFamily(), columnMetaData.getQualifier());}}return get;}public Result[] batchGet(List<Get> gets) throws Exception {//批量查好多個gettry {return table.get(gets);} catch (Exception e) {LOG.warn("Could not perform HBASE lookup.", e);throw e;}}@Overridepublic void close() throws IOException {//關閉操作table.close();} }在hbaseClient中用到了HBaseSecurityUtil,其代碼注釋如下:
package org.apache.storm.hbase.security;import static org.apache.storm.Config.TOPOLOGY_AUTO_CREDENTIALS;import java.io.IOException; import java.net.InetAddress; import java.util.List; import java.util.Map;import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.security.UserProvider; import org.apache.hadoop.security.UserGroupInformation; import org.slf4j.Logger; import org.slf4j.LoggerFactory;/*** This class provides util methods for storm-hbase connector communicating* with secured HBase.*/ public class HBaseSecurityUtil {private static final Logger LOG = LoggerFactory.getLogger(HBaseSecurityUtil.class);public static final String STORM_KEYTAB_FILE_KEY = "storm.keytab.file";public static final String STORM_USER_NAME_KEY = "storm.kerberos.principal";private static UserProvider legacyProvider = null;@SuppressWarnings("rawtypes")public static UserProvider login(Map conf, Configuration hbaseConfig) throws IOException {//Allowing keytab based login for backward compatibility.(為了向后兼容,允許使用keytab)if (UserGroupInformation.isSecurityEnabled() && (conf.get(TOPOLOGY_AUTO_CREDENTIALS) == null ||!(((List) conf.get(TOPOLOGY_AUTO_CREDENTIALS)).contains(AutoHBase.class.getName())))) {//如果開啟了安全認證,并且不是自動的安全認證登錄,那么進行登錄LOG.info("Logging in using keytab as AutoHBase is not specified for " + TOPOLOGY_AUTO_CREDENTIALS);//insure that if keytab is used only one login per process executedif(legacyProvider == null) {synchronized (HBaseSecurityUtil.class) {if(legacyProvider == null) {//初始化一個userProviderlegacyProvider = UserProvider.instantiate(hbaseConfig);/*下面就是一堆安全登錄的代碼,有興趣自己看*/String keytab = (String) conf.get(STORM_KEYTAB_FILE_KEY);if (keytab != null) {hbaseConfig.set(STORM_KEYTAB_FILE_KEY, keytab);}String userName = (String) conf.get(STORM_USER_NAME_KEY);if (userName != null) {hbaseConfig.set(STORM_USER_NAME_KEY, userName);}legacyProvider.login(STORM_KEYTAB_FILE_KEY, STORM_USER_NAME_KEY,InetAddress.getLocalHost().getCanonicalHostName());}}}return legacyProvider;} else {//如果布開啟安全認證或者是自動安全認證,那么就直接實例化一個userProviderreturn UserProvider.instantiate(hbaseConfig);}} }然后是HBaseMapper的代碼注釋:
package org.apache.storm.hbase.bolt.mapper;import org.apache.storm.tuple.Tuple; import org.apache.storm.hbase.common.ColumnList;import java.io.Serializable;/*** Maps a <code>org.apache.storm.tuple.Tuple</code> object* to a row in an HBase table.* 這是一個接口,需要自己實現,主要提供了兩個方法,一個是如何從tuple中獲取rowKey,一個是如何從tuple中獲取columns* 自帶一個簡單實現,在下面的代碼中會看到*/ public interface HBaseMapper extends Serializable {/*** Given a tuple, return the HBase rowkey.** @param tuple* @return*/byte[] rowKey(Tuple tuple);/*** Given a tuple, return a list of HBase columns to insert.** @param tuple* @return*/ColumnList columns(Tuple tuple);}他的簡單實現如下: package org.apache.storm.hbase.bolt.mapper;import static org.apache.storm.hbase.common.Utils.toBytes; import static org.apache.storm.hbase.common.Utils.toLong;import org.apache.storm.hbase.common.ColumnList; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple;public class SimpleHBaseMapper implements HBaseMapper {private static final long serialVersionUID = 1L;private String rowKeyField;//hbase中的rowKey在tuple中對應的fieldprivate byte[] columnFamily;//列族private Fields columnFields;//hbase中普通列對應的tuple中的字段數組private Fields counterFields;//hbase中計數列對應的tuple中的字段數組public SimpleHBaseMapper(){}public SimpleHBaseMapper withRowKeyField(String rowKeyField){//就是一堆set方法,不過鏈式調用起來很爽的感覺···this.rowKeyField = rowKeyField;return this;}public SimpleHBaseMapper withColumnFields(Fields columnFields){this.columnFields = columnFields;return this;}public SimpleHBaseMapper withCounterFields(Fields counterFields){this.counterFields = counterFields;return this;}public SimpleHBaseMapper withColumnFamily(String columnFamily){this.columnFamily = columnFamily.getBytes();return this;}@Overridepublic byte[] rowKey(Tuple tuple) {//返回rowKey的值Object objVal = tuple.getValueByField(this.rowKeyField);return toBytes(objVal);}@Overridepublic ColumnList columns(Tuple tuple) {//這個columnList也是storm-hbase自定義的,見下面ColumnList cols = new ColumnList();if(this.columnFields != null){//如果普通的列的字段不為空,那么給添加一個hbase中的列數據{列族-->列-->值}for(String field : this.columnFields){cols.addColumn(this.columnFamily, field.getBytes(), toBytes(tuple.getValueByField(field)));}}if(this.counterFields != null){//如果計數列的字段不為空,那么添加一個 計數值{列族-->列-->計數值(一定是long類型的)}for(String field : this.counterFields){cols.addCounter(this.columnFamily, field.getBytes(), toLong(tuple.getValueByField(field)));}}return cols;} }
在很多地方用到了ColumnList,其代碼注釋如下: package org.apache.storm.hbase.common;import java.util.ArrayList; import java.util.List;/*** Represents a list of HBase columns.代表一個hbase的column的列表** There are two types of columns, <i>standard</i> and <i>counter</i>.有兩種類型的column,分別是標準的和計數的** Standard columns have <i>column family</i> (required), <i>qualifier</i> (optional),* <i>timestamp</i> (optional), and a <i>value</i> (optional) values.* 標準的列有column family(必須有),qualifier(可選),時間錯(可選)以及值(可選) ;** Counter columns have <i>column family</i> (required), <i>qualifier</i> (optional),* and an <i>increment</i> (optional, but recommended) values.*計數列有columnfamily(必須有),qualifier(可選)以及一個要增加的數值(可選,但是建議填上)*** Inserts/Updates can be added via the <code>addColumn()</code> and <code>addCounter()</code>* methods.*插入/更新操作可以通過addColumn()和addCounter()來添加到對象中**/ public class ColumnList {public static abstract class AbstractColumn {//一個抽象的column,提供了最基本的famliy和qualifierbyte[] family, qualifier;AbstractColumn(byte[] family, byte[] qualifier){this.family = family;this.qualifier = qualifier;}public byte[] getFamily() {return family;}public byte[] getQualifier() {return qualifier;}}public static class Column extends AbstractColumn {//標準的columnbyte[] value;long ts = -1;//默認的時間戳為-1Column(byte[] family, byte[] qualifier, long ts, byte[] value){super(family, qualifier);this.value = value;this.ts = ts;}public byte[] getValue() {return value;}public long getTs() {return ts;}}public static class Counter extends AbstractColumn {//計數的columnlong incr = 0;//默認計數值為0Counter(byte[] family, byte[] qualifier, long incr){super(family, qualifier);this.incr = incr;}public long getIncrement() {return incr;}}private ArrayList<Column> columns;private ArrayList<Counter> counters;private ArrayList<Column> columns(){if(this.columns == null){this.columns = new ArrayList<Column>();}return this.columns;}private ArrayList<Counter> counters(){if(this.counters == null){this.counters = new ArrayList<Counter>();}return this.counters;}/*** Add a standard HBase column.** @param family* @param qualifier* @param ts* @param value* @return*/public ColumnList addColumn(byte[] family, byte[] qualifier, long ts, byte[] value){//添加一個標準的column到對象中columns().add(new Column(family, qualifier, ts, value));return this;}/*** Add a standard HBase column* @param family* @param qualifier* @param value* @return*/public ColumnList addColumn(byte[] family, byte[] qualifier, byte[] value){columns().add(new Column(family, qualifier, -1, value));return this;}/*** Add a standard HBase column given an instance of a class that implements* the <code>IColumn</code> interface.* @param column* @return*/public ColumnList addColumn(IColumn column){return this.addColumn(column.family(), column.qualifier(), column.timestamp(), column.value());}/*** Add an HBase counter column.** @param family* @param qualifier* @param incr* @return*/public ColumnList addCounter(byte[] family, byte[] qualifier, long incr){//添加一個計數columncounters().add(new Counter(family, qualifier, incr));return this;}/*** Add an HBase counter column given an instance of a class that implements the* <code>ICounter</code> interface.* @param counter* @return*/public ColumnList addCounter(ICounter counter){return this.addCounter(counter.family(), counter.qualifier(), counter.increment());}/*** Query to determine if we have column definitions.** @return*/public boolean hasColumns(){return this.columns != null;}/*** Query to determine if we have counter definitions.** @return*/public boolean hasCounters(){return this.counters != null;}/*** Get the list of column definitions.** @return*/public List<Column> getColumns(){return this.columns;}/*** Get the list of counter definitions.* @return*/public List<Counter> getCounters(){return this.counters;}}
在ColumnList中有通過IColumn和ICounter來添加標準column和計數column的方法,對應的接口如下: /*** Licensed to the Apache Software Foundation (ASF) under one* or more contributor license agreements. See the NOTICE file* distributed with this work for additional information* regarding copyright ownership. The ASF licenses this file* to you under the Apache License, Version 2.0 (the* "License"); you may not use this file except in compliance* with the License. You may obtain a copy of the License at** http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/ package org.apache.storm.hbase.common;/*** Interface definition for classes that support being written to HBase as* a regular column.**/ public interface IColumn {byte[] family();byte[] qualifier();byte[] value();long timestamp(); }
/*** Licensed to the Apache Software Foundation (ASF) under one* or more contributor license agreements. See the NOTICE file* distributed with this work for additional information* regarding copyright ownership. The ASF licenses this file* to you under the Apache License, Version 2.0 (the* "License"); you may not use this file except in compliance* with the License. You may obtain a copy of the License at** http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/ package org.apache.storm.hbase.common;/*** Interface definition for classes that support being written to HBase as* a counter column.**/ public interface ICounter {byte[] family();byte[] qualifier();long increment(); }
HBaseProjectionCriteria的代碼注釋如下:
package org.apache.storm.hbase.bolt;import org.apache.commons.lang.Validate; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Result; import org.apache.storm.hbase.bolt.mapper.HBaseMapper; import org.apache.storm.hbase.bolt.mapper.HBaseProjectionCriteria; import org.apache.storm.hbase.bolt.mapper.HBaseValueMapper; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; import org.apache.storm.utils.TupleUtils;import com.google.common.collect.Lists;/*** Basic bolt for querying from HBase.** Note: Each HBaseBolt defined in a topology is tied to a specific table.**/ public class HBaseLookupBolt extends AbstractHBaseBolt {private static final long serialVersionUID = 8253062226922790455L;/*這個是用來將從habse查詢到的result轉化為tuple以及聲明輸出的field字段的,需要自己實現;后面介紹*/private HBaseValueMapper rowToTupleMapper;/*這個主要用于輔助get查詢,里面分裝了要查詢的columFamily 和 columFamily:qualifier 兩數據,以便于在get的時候可以只指定列族,也可以指定特定的列;后面會介紹*/private HBaseProjectionCriteria projectionCriteria;public HBaseLookupBolt(String tableName, HBaseMapper mapper, HBaseValueMapper rowToTupleMapper){super(tableName, mapper);Validate.notNull(rowToTupleMapper, "rowToTupleMapper can not be null");this.rowToTupleMapper = rowToTupleMapper;}public HBaseLookupBolt withConfigKey(String configKey){this.configKey = configKey;return this;}public HBaseLookupBolt withProjectionCriteria(HBaseProjectionCriteria projectionCriteria) {this.projectionCriteria = projectionCriteria;return this;}@Overridepublic void execute(Tuple tuple) {if (TupleUtils.isTick(tuple)) {//如果是心跳信息的tuple,那么直接忽略不處理collector.ack(tuple);//直接確認這個tick tuplereturn;}byte[] rowKey = this.mapper.rowKey(tuple);Get get = hBaseClient.constructGetRequests(rowKey, projectionCriteria);//構建查詢try {/*其實操作很簡單 就是 table.get(get)操作;他這里看著有些不對勁的地方就是調用批量查詢方法來查詢單個的get;這是為了匹配他們自己封裝的接口,也就是沒有多封裝一下*/Result result = hBaseClient.batchGet(Lists.newArrayList(get))[0];for(Values values : rowToTupleMapper.toValues(tuple, result)) {//把result轉換為tuple,并逐一發送;this.collector.emit(tuple, values);}this.collector.ack(tuple);//發送確認消息} catch (Exception e) {//出錯處理this.collector.reportError(e);this.collector.fail(tuple);}}@Overridepublic void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {rowToTupleMapper.declareOutputFields(outputFieldsDeclarer);//聲明輸出field字段} }
最后是HBaseValueMapper的代碼注釋: package org.apache.storm.hbase.bolt.mapper;import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.tuple.ITuple; import org.apache.storm.tuple.Values; import org.apache.hadoop.hbase.client.Result;import java.io.Serializable; import java.util.List;//這是一個接口,主要定義了如何把hbase中的查詢結果映射人tuple中的values以及如何聲明tuple的輸出字段 public interface HBaseValueMapper extends Serializable {/**** @param input tuple.* @param result HBase lookup result instance.* @return list of values that should be emitted by the lookup bolt.* @throws Exception*/public List<Values> toValues(ITuple input, Result result) throws Exception;/*** declares the output fields for the lookup bolt.* @param declarer*/void declareOutputFields(OutputFieldsDeclarer declarer); }
總結
以上是生活随笔為你收集整理的storm-hbase jar包中的bolt节点源码解析的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: html 经常会用到的英语名词
- 下一篇: 无法打开多维数据集(使用Dundas的O