java自定义 filter,HBase自定义Filter
必需要提前說明下:不建議使用自定義的Filter。所有的Filter都是在服務(wù)端生效:就是說需要將自定義的Filter封裝為jar,上傳到HBase的類路徑下,并重啟HBase使之生效。對于生產(chǎn)環(huán)境的HBase來說,重啟通常是不能接受的。
Filter的設(shè)置是在客戶端完成的,而Filter的邏輯是在HBase的服務(wù)端完成的,中間需要一次序列化。我試過幾種序列化方案,不過protobuffer以外的其他幾種效果不算好。HBase自帶的Filter也是用protobuffer進(jìn)行的序列化,因此使用protobuffer還可以少傳幾個(gè)包。
需要提前說明的已經(jīng)說完了,開始進(jìn)入正題。這次從一個(gè)案例開始說起:在HBase中存儲著用戶行為記錄,行鍵設(shè)計(jì)為“uid(6位)+etime(時(shí)間戳/1000)+tid(7位)+順序號(8位)”。其中uid為用戶ID、etime為事件時(shí)間、tid為行為標(biāo)簽。目標(biāo)是檢索出某個(gè)用戶在指定時(shí)間范圍內(nèi)的幾種行為數(shù)據(jù)。
針對這個(gè)案例我們自定義一個(gè)CustomRowKeyFilter,并將一個(gè)用戶ID、事件起止時(shí)間以及多個(gè)行為ID作為CustomRowKeyFilter的成員變量。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
packagecom.zhyea.dev.hbase.filter;
importorg.apache.hadoop.hbase.Cell;
importorg.apache.hadoop.hbase.filter.FilterBase;
importorg.apache.hadoop.hbase.util.Bytes;
importjava.io.IOException;
publicclassCustomRowKeyFilterextendsFilterBase{
privatelongpid;
privatelongeventTime;
privateStringtids;
privatebooleanfilterOutRow=false;
publicCustomRowKeyFilter(long_pid,long_eventTime,String_tids){
this.pid=_pid;
this.eventTime=_eventTime;
this.tids=_tids;
}
@Override
publicbooleanfilterRowKey(byte[]data,intoffset,intlength){
StringrowKey=Bytes.toString(data,offset,length);
this.filterOutRow=check(rowKey);
returnthis.filterOutRow;
}
publicReturnCodefilterKeyValue(Cellv)throwsIOException{
if(this.filterOutRow){
returnReturnCode.NEXT_ROW;
}
returnReturnCode.INCLUDE;
}
privatebooleancheck(StringrowKey){
try{
if(rowKey.length()<7){
returntrue;
}
long_pid=Long.valueOf(rowKey.substring(0,6));
long_eTime=Long.valueOf(rowKey.substring(6,16));
long_tid=Long.valueOf(rowKey.substring(16,23));
if(this.pid!=_pid){
returntrue;
}
if(this.eventTime>_eTime){
returntrue;
}
if(!this.tids.contains(_tid+"")){
returntrue;
}
}catch(Exceptione){
returntrue;
}
returnfalse;
}
}
代碼中繼承了FilterBase類,可以減少一些結(jié)構(gòu)性的代碼工作。至于Filter是如何工作的,在網(wǎng)上找到的這張圖應(yīng)該描述得很清楚了:
前面的代碼只是實(shí)現(xiàn)了Filter的處理邏輯。要想使用這個(gè)Filter還需要做一些序列化處理。如前面所說序列化方案選擇的是protobuffer,這里需要先定義一個(gè)描述文件CustomRowKeyFilterProto.proto,內(nèi)容如下:
1
2
3
4
5
6
7
8
9
10
packagefilter;
optionjava_package="com.zhyea.dev.hbase.filter.proto";
optionjava_outer_classname="CustomRowKeyFilterProto";
messageCustomRowKeyFilter{
requiredint64pid=1;
requiredint64eventTime=2;
requiredstringtids=3;
}
定義完成后,執(zhí)行protoc命令:
1
protoc-I=./--java_out=../src/main/javaCustomRowKeyFilterProto.proto
其中“-I”指定了proto描述文件的父目錄, “—java_out”指定了java類的類路徑,具體請根據(jù)自己的情況進(jìn)行設(shè)置。執(zhí)行命令后會在包c(diǎn)om.zhyea.dev.hbase.filter.proto下生成序列化工具類CustomRowKeyFilterProto.java。
接下來在CustomRowKeyFilter中重寫Filter類的toByteArray()方法和parseFrom()方法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
publicbyte[]toByteArray()throwsIOException{
CustomRowKeyFilterProto.CustomRowKeyFilter.Builderbuilder=CustomRowKeyFilterProto.CustomRowKeyFilter.newBuilder();
builder.setPid(this.pid);
builder.setEventTime(this.eventTime);
builder.setCids(this.tids);
returnbuilder.build().toByteArray();
}
publicstaticFilterparseFrom(finalbyte[]pbBytes)throwsDeserializationException{
CustomRowKeyFilterProto.CustomRowKeyFilterproto;
try{
proto=CustomRowKeyFilterProto.CustomRowKeyFilter.parseFrom(pbBytes);
}catch(InvalidProtocolBufferExceptione){
thrownewDeserializationException(e);
}
long_pid=proto.getPid();
long_eventTime=proto.getEventTime();
String_tids=proto.getCids();
returnnewCustomRowKeyFilter(_pid,_eventTime,_tids);
}
這樣自定義Filter就完成了。剩下的事情就是將之打包并上傳到HBase(每個(gè)RegionServer)的類路徑下。然后就可以在程序中使用了。
現(xiàn)在再仔細(xì)想想這個(gè)程序,是否一定需要一個(gè)自定義Filter呢!我們已經(jīng)將查詢需要的所有元素都定義在行鍵里了。那么可以使用“uid+起始時(shí)間”作為startRow,“uid+結(jié)束時(shí)間”作為stopRow完成時(shí)間范圍的匹配,使用RegexStringComparator來處理tid的匹配,這樣直接使用HBase提供的RowFilter就能解決問題了。唯一需要注意的事情就是在設(shè)計(jì)表時(shí)多花些心思在行鍵上罷了。
就是這樣。
參考文檔
總結(jié)
以上是生活随笔為你收集整理的java自定义 filter,HBase自定义Filter的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: java flatmap_Java 8
- 下一篇: sql数据导入错误代码: 0x80004