Hbase Solr 二级索引 同步int数据报错com.ngdata.hbaseindexer.parse.ByteArrayValueMappers: Error mapping byte
二級索引實現方式:Hbase +?Key-Value Store Indexer?+ Solr
同步int數據時提示異常
異常如下
2019-12-16 17:39:18,346 WARN com.ngdata.hbaseindexer.parse.ByteArrayValueMappers: Error mapping byte value 101 to int java.lang.IllegalArgumentException: offset (0) + length (4) exceed the capacity of the array: 3at org.apache.hadoop.hbase.util.Bytes.explainWrongLengthOrOffset(Bytes.java:838)at org.apache.hadoop.hbase.util.Bytes.toInt(Bytes.java:1004)at org.apache.hadoop.hbase.util.Bytes.toInt(Bytes.java:980)at com.ngdata.hbaseindexer.parse.ByteArrayValueMappers$1.mapInternal(ByteArrayValueMappers.java:37)at com.ngdata.hbaseindexer.parse.ByteArrayValueMappers$AbstractByteValueMapper.map(ByteArrayValueMappers.java:157)at com.ngdata.hbaseindexer.morphline.ExtractHBaseCellsBuilder$Mapping.extractWithSingleOutputField(ExtractHBaseCellsBuilder.java:204)at com.ngdata.hbaseindexer.morphline.ExtractHBaseCellsBuilder$Mapping.apply(ExtractHBaseCellsBuilder.java:197)at com.ngdata.hbaseindexer.morphline.ExtractHBaseCellsBuilder$ExtractHBaseCells.doProcess(ExtractHBaseCellsBuilder.java:83)at org.kitesdk.morphline.base.AbstractCommand.process(AbstractCommand.java:161)at org.kitesdk.morphline.base.AbstractCommand.doProcess(AbstractCommand.java:186)at org.kitesdk.morphline.base.AbstractCommand.process(AbstractCommand.java:161)at com.ngdata.hbaseindexer.morphline.LocalMorphlineResultToSolrMapper.map(LocalMorphlineResultToSolrMapper.java:230)at com.ngdata.hbaseindexer.morphline.MorphlineResultToSolrMapper.map(MorphlineResultToSolrMapper.java:145)at com.ngdata.hbaseindexer.indexer.Indexer$RowBasedIndexer.calculateIndexUpdates(Indexer.java:289)at com.ngdata.hbaseindexer.indexer.Indexer.indexRowData(Indexer.java:144)at com.ngdata.hbaseindexer.indexer.IndexingEventListener.processEvents(IndexingEventListener.java:98)at com.ngdata.sep.impl.SepEventExecutor$1.run(SepEventExecutor.java:97)at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)at java.util.concurrent.FutureTask.run(FutureTask.java:266)at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)at java.lang.Thread.run(Thread.java:748)?服務調用的是Hbase jar包下的 Bytes.toInt方法,我們再idea中調用驗證一下,提示異常和hbase-solr提示異常相同
int a = Bytes.toInt(Bytes.toBytes(111)); System.out.println(a); System.out.println("-------------------"); int b = Bytes.toInt(Bytes.toBytes("101")); System.out.println(b);輸出如下: 111 ------------------- Exception in thread "main" java.lang.IllegalArgumentException: offset (0) + length (4) exceed the capacity of the array: 3at org.apache.hadoop.hbase.util.Bytes.explainWrongLengthOrOffset(Bytes.java:838)at org.apache.hadoop.hbase.util.Bytes.toInt(Bytes.java:1004)at org.apache.hadoop.hbase.util.Bytes.toInt(Bytes.java:980)at com.example.demo.Test.main(Test.java:23)查看調用的方法?Bytes.toInt 中 , 我們發現方法會判斷如果?offset + length > bytes.length 拋出異常,也就是我們傳入數據的字節數小于 4 則會拋出 explainWrongLengthOrOffset 異常
public static final int SIZEOF_INT = Integer.SIZE / Byte.SIZE; //SIZEOF_INT = 4/*** Converts a byte array to an int value* @param bytes byte array* @return the int value*/public static int toInt(byte[] bytes) {return toInt(bytes, 0, SIZEOF_INT);}public static int toInt(byte[] bytes, int offset, final int length) {if (length != SIZEOF_INT || offset + length > bytes.length) {throw explainWrongLengthOrOffset(bytes, offset, length, SIZEOF_INT);}if (UNSAFE_UNALIGNED) {return UnsafeAccess.toInt(bytes, offset);} else {int n = 0;for(int i = offset; i < (offset + length); i++) {n <<= 8;n ^= bytes[i] & 0xFF;}return n;}}private static IllegalArgumentExceptionexplainWrongLengthOrOffset(final byte[] bytes,final int offset,final int length,final int expectedLength) {String reason;if (length != expectedLength) {reason = "Wrong length: " + length + ", expected " + expectedLength;} else {reason = "offset (" + offset + ") + length (" + length + ") exceed the"+ " capacity of the array: " + bytes.length;}return new IllegalArgumentException(reason);}?
?我們知道在java中 int 占用字節數為4 ,字符串"101"占用字節為3 ,所以當字符串小于 4 的時候,我們同步數據會報錯
System.out.println("占用字節->"+"101".getBytes().length);輸出如下:占用字節->3這時可以猜想是不是傳入字符串長度 >=?4 時候,就正常了
我們通過hbase put進去數據
put 'tableName','test1','fn:comments_count','1111' put 'tableName','test2','fn:comments_count','11111'我們發現日志中沒有報錯,并且solr中數據已經成功同步過去;但是發現同步過去的數據不對,并不是我們插入的
?
Cloudera 文檔中提出可以實現?com.ngdata.hbaseindexer.parse.ByteArrayValueMapper 接口自定義類型
文檔鏈接:https://docs.cloudera.com/documentation/enterprise/latest/topics/search_hbase_batch_indexer.html#id_dfb_g24_3db
?實現該接口,自定義類型解決上述問題
com.ngdata.hbaseindexer.parse.ByteArrayValueMapper 接口在jar包 hbase-indexer-engine-1.*-cdh*.*.*.jar 包中
該jar包可以從?/opt/cloudera/parcels/CDH-6.*.*-1.cdh6.*.*.p0.590678/lib/hbase-solr/lib 目錄下找到,
> ls hbase-indexer-engine-1.*-cdh*.*.*.jar hbase-indexer-engine-1.5-cdh6.0.1.jar實現該接口
import com.google.common.collect.ImmutableList; import com.ngdata.hbaseindexer.parse.ByteArrayValueMapper; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.util.Bytes;import java.util.Collection;public class Jastint implements ByteArrayValueMapper {private static Log log = LogFactory.getLog(Jastint.class);public Collection<? extends Object> map(byte[] input) {try {return ImmutableList.of(mapInternal(Bytes.toString(input)));} catch (IllegalArgumentException e) {log.warn(String.format("Error mapping byte value %s to %s", Bytes.toStringBinary(input),int.class.getName()), e);return ImmutableList.of();}}private int mapInternal(String toString) {return Integer.parseInt(toString);}}打成 jar 包,將jar傳到?/opt/cloudera/parcels/CDH-6.*.*-1.cdh*.*.*.p0.590678/lib/hbase-solr/lib/ 目錄下,Key-Value Store Indexer 服務啟動時會讀取該目錄下的jar包
修改?Morphlines 配置文件如下,對?hbase_indexer_vt_fn_comments_count 字段 使用我們自定義的?com.jast.hbaseindexer.Jastint 類,注意這里要引用 包名+類名 否則會提示找不到類
SOLR_LOCATOR : {# Name of solr collectioncollection : hbaseindexer# ZooKeeper ensemblezkHost : "$ZK_HOST" }morphlines : [ { id : WeiBoTableMap importCommands : ["org.kitesdk.**", "com.ngdata.**"]commands : [ {extractHBaseCells {mappings : [{inputColumn : "fn:name"outputField : "hbase_indexer_vt_fn_name" type : string source : value},{inputColumn : "fn:comments_count"outputField : "hbase_indexer_vt_fn_comments_count" type : "com.jast.hbaseindexer.Jastint"source : value},{inputColumn : "fn:text"outputField : "hbase_indexer_vt_fn_text" type : stringsource : value}]}}{ logDebug { format : "output record: {}", args : ["@{}"] } } ] } ]重啟?Key-Value Store Indexer 服務,再次插入數據
put 'tableName','test1','fn:comments_count','2587' put 'tableName','test2','fn:comments_count','2587'插入成功?
?
總結
以上是生活随笔為你收集整理的Hbase Solr 二级索引 同步int数据报错com.ngdata.hbaseindexer.parse.ByteArrayValueMappers: Error mapping byte的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: CDH 6 安装服务哈希验证失败 解决方
- 下一篇: CDH kafka JMX 启动