HBase 1.x Coprocessor使用指南
HBase 1.x Coprocessor使用指南
@(HBASE)[hbase]
- HBase 1x Coprocessor使用指南
- 一概述
- 1起因Why HBase Coprocessor
- 2靈感來源 Source of Inspration
- 3細節剖析Implementation
- 1觀察者Observer
- 2終端Endpoint
- 二Observer
- 一示例
- 1準備類文件
- 2部署方式一
- 1修改配置增加協處理器類
- 2將包含上述類的jar包入到hbase的lib目錄中
- 3重啟hbase
- 3部署方式二
- 1將包含上述類的jar包放到某個hdfs路徑
- 2創建表時指定coprocessor
- 4使用coproccessor
- 1在hbase shell中使用coprocessor
- 2使用java API使用coprocessor
- 5問題
- 一示例
- 三Endpoint
- 零業務常景描述
- 一準備proto文件
- 二使用protoc生成類文件
- 三實現真實的服務
- 1類的結構
- 2 getService
- 3 startCoprocessorEnvironment env
- 4stopCoprocessorEnvironment env
- 5 getCountAndSum
- 四部署coprocessor
- 五客戶端使用coprocessor
- 六運行程序
HBase在0.92版本之后,提供了協處理器功能。
在之前介紹過,HBase提供了過濾器,以減少從服務器返回客戶端的數據。而協處理器用于將部分處理工作交由RegionServer處理,而不是全部返回client再處理。
舉個例子,HBase的安全機制就是通過協處理器實現的。當用戶向HBase發出一個讀寫請求時,HBase會首先觸發這個協處理器,它會在讀寫操作前確認用戶是否有這個權限。
一、概述
(以下概述性的內容摘自網絡)。
1、起因(Why HBase Coprocessor)
HBase作為列族數據庫最經常被人詬病的特性包括:無法輕易建立“二級索引”,難以執行求和、計數、排序等操作。比如,在舊版本的(<0.92)Hbase中,統計數據表的總行數,需要使用Counter方法,執行一次MapReduce Job才能得到。雖然HBase在數據存儲層中集成了MapReduce,能夠有效用于數據表的分布式計算。然而在很多情況下,做一些簡單的相加或者聚合計算的時候,如果直接將計算過程放置在server端,能夠減少通訊開銷,從而獲得很好的性能提升。于是,HBase在0.92之后引入了協處理器(coprocessors),實現一些激動人心的新特性:能夠輕易建立二次索引、復雜過濾器(謂詞下推)以及訪問控制等。
2、靈感來源( Source of Inspration)
HBase協處理器的靈感來自于Jeff Dean 09年的演講( P66-67)。它根據該演講實現了類似于bigtable的協處理器,包括以下特性:
- 每個表服務器的任意子表都可以運行代碼
- 客戶端的高層調用接口(客戶端能夠直接訪問數據表的行地址,多行讀寫會自動分片成多個并行的RPC調用)
- 提供一個非常靈活的、可用于建立分布式服務的數據模型
- 能夠自動化擴展、負載均衡、應用請求路由
HBase的協處理器靈感來自bigtable,但是實現細節不盡相同。HBase建立了一個框架,它為用戶提供類庫和運行時環境,使得他們的代碼能夠在HBase region server和master上處理。
3、細節剖析(Implementation)
協處理器分兩種類型,系統協處理器可以全局導入region server上的所有數據表,表協處理器即是用戶可以指定一張表使用協處理器。協處理器框架為了更好支持其行為的靈活性,提供了兩個不同方面的插件。一個是觀察者(observer),類似于關系數據庫的觸發器。另一個是終端(endpoint),動態的終端有點像存儲過程。
3.1觀察者(Observer)
觀察者的設計意圖是允許用戶通過插入代碼來重載協處理器框架的upcall方法,而具體的事件觸發的callback方法由HBase的核心代碼來執行。協處理器框架處理所有的callback調用細節,協處理器自身只需要插入添加或者改變的功能。
HBase 提供了三種觀察者接口:
- RegionObserver:提供客戶端的數據操縱事件鉤子:Get、Put、Delete、Scan等。
- WALObserver:提供WAL相關操作鉤子。
- MasterObserver:提供DDL-類型的操作鉤子。如創建、刪除、修改數據表等。
這些接口可以同時使用在同一個地方,按照不同優先級順序執行.用戶可以任意基于協處理器實現復雜的HBase功能層。HBase有很多種事件可以觸發觀察者方法,這些事件與方法從HBase0.92版本起,都會集成在HBase API中。不過這些API可能會由于各種原因有所改動,不同版本的接口改動比較大,具體參考Java Doc。
3.2終端(Endpoint)
終端是動態RPC插件的接口,它的實現代碼被安裝在服務器端,從而能夠通過HBase RPC喚醒。客戶端類庫提供了非常方便的方法來調用這些動態接口,它們可以在任意時候調用一個終端,它們的實現代碼會被目標region遠程執行,結果會返回到終端。用戶可以結合使用這些強大的插件接口,為HBase添加全新的特性。
具體使用方法參考下面。
二、Observer
完整代碼請見:https://github.com/lujinhong/lujinhong-commons/tree/master/lujinhong-commons-hbase/src/main/java/com/lujinhong/commons/hbase/coprocessor
(一)示例
1、準備類文件
這個例子的coprocessor是一個RegionObserver,它判斷如果請求的rowkey是@@@GETTIME@@@,則返回系統當前時間,然后不再請求region讀取實際的數據(e.bypass()),否則,有可能返回2行。
幅使用的是preGetOp()方法,因此所有的Get操作都會先經過這個Coprocessor處理。
2、部署方式一
這種方法適用于集群管理人員使用,所部署的coprocessor會影響所有表,所有region。
(1)修改配置,增加協處理器類
<property><name>hbase.coprocessor.region.classes</name><value>org.apache.hadoop.hbase.security.token.TokenProvider,org.apache.hadoop.hbase.security.access.AccessController,com.lujinhong.commons.hbase.coprocessor.CoprocessorDemo</value></property>這里也可以看出來,安全相關的控制都使用協處理器完成的。
(2)將包含上述類的jar包入到hbase的lib目錄中
(3)重啟hbase
bin/rolling-restart.sh3、部署方式二
這種方式適合開發人員使用,只會影響特寫的表或者region。這種方式有可能導致開發人員濫用coprocessor,從而使得hbase集群負載過高,因此建議回收建表權限,只能由集群管理人員建表,并在建表時指定coprocessor。
這種方式無須重啟集群,從而達到熱加載的目的。
(1)將包含上述類的jar包放到某個hdfs路徑
如/hbase/userlib。當然也可以直接放在本地目錄,但要保證每臺hbase服務器都有這個類。
(2)創建表時,指定coprocessor
create 'ljhtest', 'f1' disable 'ljhtest' alter 'ljhtest', 'Coprocessor'=>'hdfs://testing/hbase/userlib/gdc-commons-hbase-0.1-SNAPSHOT.jar|com.lujinhong.commons.hbase.coprocessor.CoprocessorDemo|1073741825|arg1=1' enable 'ljhtest'注意jar包的權限,如果hbase用戶不能讀取這個jar包,會導致enable時失敗。
說明文檔如下:
hbase> alter 't1','coprocessor'=>'hdfs:///foo.jar|com.foo.FooRegionObserver|1001|arg1=1,arg2=2'Since you can have multiple coprocessors configured for a table, a sequence number will be automatically appended to the attribute name to uniquely identify it.The coprocessor attribute must match the pattern below in order for the framework to understand how to load the coprocessor classes:[coprocessor jar file location] | class name | [priority] | [arguments]也可以在java代碼中通過HTableDescription來指定coprocessor。
4、使用coproccessor
(1)在hbase shell中使用coprocessor
hbase(main):007:0* get 'ljhtest3','@@@GETTIME@@@' COLUMN CELL@@@GETTIME@@@:@@@GETTIME@@@ timestamp=9223372036854775807, value=\x00\x00\x01T\xCE0=\x9E 1 row(s) in 0.1610 seconds就會返回當前時間。注意如果使用方式一部署的話,請求所有表均會返回正確結果,而使用方式二部署的話只有請求指定的表才會返回當前時間。
(2)使用java API使用coprocessor
一樣的,沒有什么特寫,正常讀取即可。
5、問題
在運行coprocessor時,若不失效,則到region所在的regionserver中查看日志,比如有些包沒打包上去等。
三、Endpoint
完整代碼請見:https://github.com/lujinhong/lujinhong-commons/tree/master/lujinhong-commons-hbase/src/main/java/com/lujinhong/commons/hbase/coprocessor
除了本例以外,還可以參考hbase源代碼中的RowCountEndpoint。
注意,hbase 0.98對實現endpoint的API作了很大的調整,《hbase權威指南》等書的API均不能再使用。
創建一個Endpoint的基本流程可以歸納為:
(1)創建一個通信協議:準備一個proto文件,然后使用protoc工具來生成協議類文件。這個文件需要在服務端及客戶端存在。
(2)創建一個Service類,實現具體的業務邏輯
(3)創建表時指定使用這個EndPoint,或者是全局配置。
(4)創建一個Client類,調用這個RPC方法。
(零)業務常景描述
HBase表中有一個family, 2相column,分別為f:c1, f:c2。rowkey為某個用戶id(當然經過hash以后以避免熱點),2個列分別表示這個用戶在2款產品的在線時間,單位為秒。如:
id1 column=f1:c1, timestamp=1464323601847, value=500000id1 column=f1:c2, timestamp=1464323601883, value=600000id2 column=f1:c1, timestamp=1464323648768, value=500id2 column=f1:c2, timestamp=1464323648758, value=600000 id3 column=f1:c1, timestamp=1464323648775, value=700000id3 column=f1:c2, timestamp=1464323648783, value=700id4 column=f1:c1, timestamp=1464324774802, value=700000id4 column=f1:c2, timestamp=1464324774845, value=800000下面就基于這些數據來計算。
要求計算:
(1)2個產品的真實用戶有多少,定義為在線時長超過10分鐘的
(2)這2個產品的真實用戶平均在線時長是多少
協處理器的處理邏輯為:
(1)請求為這2個列名,萬一上線新產品時可以直接使用,也可示范如何定義一個request。
(2)計算2個產品的真實用戶count1, count2
(3)計算這2個產品用戶分別的在線總時長sum1, sum2
(4)將各個region的sum和count分別加起來后,計算2個產品的平均值。
(一)準備proto文件
請求參數為列的名稱,用“;”分隔,返回的是這2個產品的真實用戶數與在線總時長。
option java_package = "com.lujinhong.coprocessor"; option java_outer_classname = "MultiColumnSumProtocol"; option java_generic_services = true; option java_generate_equals_and_hash = true; option optimize_for = SPEED;message CountRequest {required string columns = 1; }message CountResponse {required int64 count1 = 1 [default = 0];required int64 count2 = 2 [default = 0];required int64 sum1 = 3 [default = 0];required int64 sum2 = 4 [default = 0]; }service RowCountService {rpc getCountAndSum(CountRequest)returns (CountResponse); }(二)使用protoc生成類文件
protoc --java_out=../java/ MultiColumnSum.proto這個命令在使用上面的proto文件生成相應的類文件,這個類文件有幾個地方需要注意:
1、生成了一個CountRequest內部類,表示請求信息
2、生成了一個CountResponse內部類,表示返回信息
3、生成了一個 RowCountService內部類,表示所提供的服務,這個類還有一個內部接口,這個接口定義了 getCountAndSum()這個方法。
我們下面需要做的就是實現這個接口的這個方法,提供真正的服務。
(三)實現真實的服務
1、類的結構
提供真實服務的類繼承自上面自動生成的Server類,同時需要實現Coprocessor和CoprocessorService2個接口:
public class MultiColumnSum extends MultiColumnSumProtocol.RowCountService implements Coprocessor, CoprocessorService它需要實現以下4個方法,下面我們逐一討論一下:
@Override public Service getService() { return null; }@Override public void start(CoprocessorEnvironment env) throws IOException { }@Override public void stop(CoprocessorEnvironment env) throws IOException { }@Override public void getCountAndSum(RpcController controller, CountRequest request, RpcCallback<CountResponse> done) { }2、 getService()
這個方法直接返回自身即可。
@Override public Service getService() { return this; }3、 start(CoprocessorEnvironment env)
這個方法會在coprocessor啟動時調用,這里判斷了是否在一個region內被使用,而不是master,WAL等環境下被調用。
@Override public void start(CoprocessorEnvironment env) throws IOException {if (env instanceof RegionCoprocessorEnvironment) {this.env = (RegionCoprocessorEnvironment) env;} else {throw new CoprocessorException("Must be loaded on a table region!");} }4、stop(CoprocessorEnvironment env)
這個方法會在coprocessor完成時被調用,可用于關閉資源等,這里為空。
@Overridepublic void stop(CoprocessorEnvironment env) throws IOException { }5、 getCountAndSum(…)
這是整個類的核心方法,用于實現真正的業務邏輯。關鍵的步驟有:
(1)根據request創建一個Scanner,然后使用它創建一個 InternalScanner,可以更高效的進行scan
(2)對掃描出來的行進行分析處理,將結果保存在幾個變量中。
(3)調用response的各個set()方法,設置返回的結果。
(4)使用 done.run(response); 返回結果到客戶端。
這個方法的完整代碼如下:
(四)部署coprocessor
將上述2個類進行打包,然后按照上面Oberver部分介紹的部署方法來部署coprocessor。
(五)客戶端使用coprocessor
注意,如果很多代碼用到這個coprocessor,最好封裝成更方便調用的方式。
最核心的代碼是:
Map<byte[], ResponseInfo> map = table.coprocessorService(MultiColumnSumProtocol.RowCountService.class, null,null, new Batch.Call<MultiColumnSumProtocol.RowCountService, ResponseInfo>() {@Overridepublic ResponseInfo call(MultiColumnSumProtocol.RowCountService service) throws IOException {BlockingRpcCallback<MultiColumnSumProtocol.CountResponse> rpcCallback = new BlockingRpcCallback<>();service.getCountAndSum(null, request, rpcCallback);MultiColumnSumProtocol.CountResponse response = rpcCallback.get();ResponseInfo responseInfo = new ResponseInfo();responseInfo.count1 = response.getCount1();responseInfo.count2 = response.getCount2();responseInfo.sum1 = response.getSum1();responseInfo.sum2 = response.getSum2();return responseInfo;}});將調用的結果返回保存在一個map中,每個region會產生一條數據。然后通過合并各個region的結果來得出最終的結果即可。
ResponseInfo result = new ResponseInfo(); for (ResponseInfo ri : map.values()) {result.count1 += ri.count1;result.count2 += ri.count2;result.sum1 += ri.sum1;result.sum2 += ri.sum2; }System.out.println("Produce 1 has " + result.count1 + " user, all online time is " + result.sum1 / 1000+ " minutes, average online time is " + result.sum1 / 1000 / result.count1 + "minutes.");System.out.println("Produce 2 has " + result.count2 + " user, all online time is " + result.sum2 / 1000+ " minutes, average online time is " + result.sum2 / 1000 / result.count2 + "minutes.");(六)運行程序
打包并上傳至集群,然后運行程序。
注意只打包client類與protocol類,不要打包Service類。即部署到集群的jar包包括Service類和protocol類,而運行任務的jar包包括client類與protocol類。
輸出結果為:
Produce 1 has 3 user, all online time is 1900 minutes, average online time is 633minutes. Produce 2 has 3 user, all online time is 2000 minutes, average online time is 666minutes.總結
以上是生活随笔為你收集整理的HBase 1.x Coprocessor使用指南的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: UncaughtExceptionHan
- 下一篇: 在hadoop/hbase等代码中kin