HBase性能优化方法总结(三):读表操作
3. 讀表操作
3.1 多HTable并發(fā)讀
創(chuàng)建多個HTable客戶端用于讀操作,提高讀數(shù)據(jù)的吞吐量,一個例子:
?
static?final?Configuration conf = HBaseConfiguration.create();
static?final?String table_log_name = “user_log”;
rTableLog = new?HTable[tableN];
for?(int?i = 0; i < tableN; i++) {
????rTableLog[i] = new?HTable(conf, table_log_name);
????rTableLog[i].setScannerCaching(50);
}
?
3.2 HTable參數(shù)設(shè)置
3.2.1 Scanner Caching
hbase.client.scanner.caching配置項可以設(shè)置HBase scanner一次從服務(wù)端抓取的數(shù)據(jù)條數(shù),默認(rèn)情況下一次一條。通過將其設(shè)置成一個合理的值,可以減少scan過程中next()的時間開銷,代價是scanner需要通過客戶端的內(nèi)存來維持這些被cache的行記錄。
有三個地方可以進(jìn)行配置:1)在HBase的conf配置文件中進(jìn)行配置;2)通過調(diào)用HTable.setScannerCaching(int scannerCaching)進(jìn)行配置;3)通過調(diào)用Scan.setCaching(int caching)進(jìn)行配置。三者的優(yōu)先級越來越高。
3.2.2 Scan Attribute Selection
scan時指定需要的Column Family,可以減少網(wǎng)絡(luò)傳輸數(shù)據(jù)量,否則默認(rèn)scan操作會返回整行所有Column Family的數(shù)據(jù)。
3.2.3 Close ResultScanner
通過scan取完數(shù)據(jù)后,記得要關(guān)閉ResultScanner,否則RegionServer可能會出現(xiàn)問題(對應(yīng)的Server資源無法釋放)。
3.3 批量讀
通過調(diào)用HTable.get(Get)方法可以根據(jù)一個指定的row key獲取一行記錄,同樣HBase提供了另一個方法:通過調(diào)用HTable.get(List<Get>)方法可以根據(jù)一個指定的row key列表,批量獲取多行記錄,這樣做的好處是批量執(zhí)行,只需要一次網(wǎng)絡(luò)I/O開銷,這對于對數(shù)據(jù)實時性要求高而且網(wǎng)絡(luò)傳輸RTT高的情景下可能帶來明顯的性能提升。
3.4 多線程并發(fā)讀
在客戶端開啟多個HTable讀線程,每個讀線程負(fù)責(zé)通過HTable對象進(jìn)行g(shù)et操作。下面是一個多線程并發(fā)讀取HBase,獲取店鋪一天內(nèi)各分鐘PV值的例子:
?
public?class?DataReaderServer {
?????//獲取店鋪一天內(nèi)各分鐘PV值的入口函數(shù)
?????public?static?ConcurrentHashMap<String, String> getUnitMinutePV(long?uid, long?startStamp, long?endStamp){
?????????long?min = startStamp;
?????????int?count = (int)((endStamp - startStamp) / (60*1000));
?????????List<String> lst = new?ArrayList<String>();
?????????for?(int?i = 0; i <= count; i++) {
????????????min = startStamp + i * 60 * 1000;
????????????lst.add(uid + "_" + min);
?????????}
?????????return?parallelBatchMinutePV(lst);
?????}
??????//多線程并發(fā)查詢,獲取分鐘PV值
private?static?ConcurrentHashMap<String, String> parallelBatchMinutePV(List<String> lstKeys){
????????ConcurrentHashMap<String, String> hashRet = new?ConcurrentHashMap<String, String>();
????????int?parallel = 3;
????????List<List<String>> lstBatchKeys ?= null;
????????if?(lstKeys.size() < parallel ){
????????????lstBatchKeys ?= new?ArrayList<List<String>>(1);
????????????lstBatchKeys.add(lstKeys);
????????}
????????else{
????????????lstBatchKeys ?= new?ArrayList<List<String>>(parallel);
????????????for(int?i = 0; i < parallel; i++ ?){
????????????????List<String> lst = new?ArrayList<String>();
????????????????lstBatchKeys.add(lst);
????????????}
????????????for(int?i = 0 ; i < lstKeys.size() ; i ++ ){
????????????????lstBatchKeys.get(i%parallel).add(lstKeys.get(i));
????????????}
????????}
????????
????????List<Future< ConcurrentHashMap<String, String> >> futures = new?ArrayList<Future< ConcurrentHashMap<String, String> >>(5);
????????
????????ThreadFactoryBuilder builder = new?ThreadFactoryBuilder();
????????builder.setNameFormat("ParallelBatchQuery");
????????ThreadFactory factory = builder.build();
????????ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(lstBatchKeys.size(), factory);
????????
????????for(List<String> keys : lstBatchKeys){
????????????Callable< ConcurrentHashMap<String, String> > callable = new?BatchMinutePVCallable(keys);
????????????FutureTask< ConcurrentHashMap<String, String> > future = (FutureTask< ConcurrentHashMap<String, String> >) executor.submit(callable);
????????????futures.add(future);
????????}
????????executor.shutdown();
????????
????????// Wait for all the tasks to finish
????????try?{
??????????boolean?stillRunning = !executor.awaitTermination(
??????????????5000000, TimeUnit.MILLISECONDS);
??????????if?(stillRunning) {
????????????try?{
????????????????executor.shutdownNow();
????????????} catch?(Exception e) {
????????????????// TODO Auto-generated catch block
????????????????e.printStackTrace();
????????????}
??????????}
????????} catch?(InterruptedException e) {
??????????try?{
??????????????Thread.currentThread().interrupt();
??????????} catch?(Exception e1) {
????????????// TODO Auto-generated catch block
????????????e1.printStackTrace();
??????????}
????????}
????????
????????// Look for any exception
????????for?(Future f : futures) {
??????????try?{
??????????????if(f.get() != null)
??????????????{
??????????????????hashRet.putAll((ConcurrentHashMap<String, String>)f.get());
??????????????}
??????????} catch?(InterruptedException e) {
????????????try?{
?????????????????Thread.currentThread().interrupt();
????????????} catch?(Exception e1) {
????????????????// TODO Auto-generated catch block
????????????????e1.printStackTrace();
????????????}
??????????} catch?(ExecutionException e) {
????????????e.printStackTrace();
??????????}
????????}
????????
????????return?hashRet;
????}
?????//一個線程批量查詢,獲取分鐘PV值
????protected?static?ConcurrentHashMap<String, String> getBatchMinutePV(List<String> lstKeys){
????????ConcurrentHashMap<String, String> hashRet = null;
????????List<Get> lstGet = new?ArrayList<Get>();
????????String[] splitValue = null;
????????for?(String s : lstKeys) {
????????????splitValue = s.split("_");
????????????long?uid = Long.parseLong(splitValue[0]);
????????????long?min = Long.parseLong(splitValue[1]);
????????????byte[] key = new?byte[16];
????????????Bytes.putLong(key, 0, uid);
????????????Bytes.putLong(key, 8, min);
????????????Get g = new?Get(key);
????????????g.addFamily(fp);
????????????lstGet.add(g);
????????}
????????Result[] res = null;
????????try?{
????????????res = tableMinutePV[rand.nextInt(tableN)].get(lstGet);
????????} catch?(IOException e1) {
????????????logger.error("tableMinutePV exception, e=" + e1.getStackTrace());
????????}
????????if?(res != null?&& res.length > 0) {
????????????hashRet = new?ConcurrentHashMap<String, String>(res.length);
????????????for?(Result re : res) {
????????????????if?(re != null?&& !re.isEmpty()) {
????????????????????try?{
????????????????????????byte[] key = re.getRow();
????????????????????????byte[] value = re.getValue(fp, cp);
????????????????????????if?(key != null?&& value != null) {
????????????????????????????hashRet.put(String.valueOf(Bytes.toLong(key,
????????????????????????????????????Bytes.SIZEOF_LONG)), String.valueOf(Bytes
????????????????????????????????????.toLong(value)));
????????????????????????}
????????????????????} catch?(Exception e2) {
????????????????????????logger.error(e2.getStackTrace());
????????????????????}
????????????????}
????????????}
????????}
????????return?hashRet;
????}
}
//調(diào)用接口類,實現(xiàn)Callable接口
class?BatchMinutePVCallable implements?Callable<ConcurrentHashMap<String, String>>{
?????private?List<String> keys;
?????public?BatchMinutePVCallable(List<String> lstKeys ) {
?????????this.keys = lstKeys;
?????}
?????public?ConcurrentHashMap<String, String> call() throws?Exception {
?????????return?DataReadServer.getBatchMinutePV(keys);
?????}
}
3.5 緩存查詢結(jié)果
對于頻繁查詢HBase的應(yīng)用場景,可以考慮在應(yīng)用程序中做緩存,當(dāng)有新的查詢請求時,首先在緩存中查找,如果存在則直接返回,不再查詢HBase;否則對HBase發(fā)起讀請求查詢,然后在應(yīng)用程序中將查詢結(jié)果緩存起來。至于緩存的替換策略,可以考慮LRU等常用的策略。
3.6 Blockcache
HBase上Regionserver的內(nèi)存分為兩個部分,一部分作為Memstore,主要用來寫;另外一部分作為BlockCache,主要用于讀。
寫請求會先寫入Memstore,Regionserver會給每個region提供一個Memstore,當(dāng)Memstore滿64MB以后,會啟動 flush刷新到磁盤。當(dāng)Memstore的總大小超過限制時(heapsize * hbase.regionserver.global.memstore.upperLimit * 0.9),會強(qiáng)行啟動flush進(jìn)程,從最大的Memstore開始flush直到低于限制。
讀請求先到Memstore中查數(shù)據(jù),查不到就到BlockCache中查,再查不到就會到磁盤上讀,并把讀的結(jié)果放入BlockCache。由于BlockCache采用的是LRU策略,因此BlockCache達(dá)到上限(heapsize * hfile.block.cache.size * 0.85)后,會啟動淘汰機(jī)制,淘汰掉最老的一批數(shù)據(jù)。
一個Regionserver上有一個BlockCache和N個Memstore,它們的大小之和不能大于等于heapsize * 0.8,否則HBase不能啟動。默認(rèn)BlockCache為0.2,而Memstore為0.4。對于注重讀響應(yīng)時間的系統(tǒng),可以將?BlockCache設(shè)大些,比如設(shè)置BlockCache=0.4,Memstore=0.39,以加大緩存的命中率。
有關(guān)BlockCache機(jī)制,請參考這里:HBase的Block cache,HBase的blockcache機(jī)制,hbase中的緩存的計算與使用。
?
?
HTable和HTablePool使用注意事項
HTable和HTablePool都是HBase客戶端API的一部分,可以使用它們對HBase表進(jìn)行CRUD操作。下面結(jié)合在項目中的應(yīng)用情況,對二者使用過程中的注意事項做一下概括總結(jié)。
Configuration?conf = HBaseConfiguration.create();
try?(Connection?connection = ConnectionFactory.createConnection(conf)) {
??try?(Table table = connection.getTable(TableName.valueOf(tablename)) {
????// use table as needed, the table returned is lightweight
??}
}
?
HTable
HTable是HBase客戶端與HBase服務(wù)端通訊的Java API對象,客戶端可以通過HTable對象與服務(wù)端進(jìn)行CRUD操作(增刪改查)。它的創(chuàng)建很簡單:
Configuration conf = HBaseConfiguration.create();
HTable table = new?HTable(conf, "tablename");
//TODO CRUD Operation……
HTable使用時的一些注意事項:
1.???規(guī)避HTable對象的創(chuàng)建開銷
因為客戶端創(chuàng)建HTable對象后,需要進(jìn)行一系列的操作:檢查.META.表確認(rèn)指定名稱的HBase表是否存在,表是否有效等等,整個時間開銷比較重,可能會耗時幾秒鐘之長,因此最好在程序啟動時一次性創(chuàng)建完成需要的HTable對象,如果使用Java API,一般來說是在構(gòu)造函數(shù)中進(jìn)行創(chuàng)建,程序啟動后直接重用。
2.???HTable對象不是線程安全的
HTable對象對于客戶端讀寫數(shù)據(jù)來說不是線程安全的,因此多線程時,要為每個線程單獨創(chuàng)建復(fù)用一個HTable對象,不同對象間不要共享HTable對象使用,特別是在客戶端auto flash被置為false時,由于存在本地write buffer,可能導(dǎo)致數(shù)據(jù)不一致。
3.???HTable對象之間共享Configuration
HTable對象共享Configuration對象,這樣的好處在于:
- 共享ZooKeeper的連接:每個客戶端需要與ZooKeeper建立連接,查詢用戶的table regions位置,這些信息可以在連接建立后緩存起來共享使用;
- 共享公共的資源:客戶端需要通過ZooKeeper查找-ROOT-和.META.表,這個需要網(wǎng)絡(luò)傳輸開銷,客戶端緩存這些公共資源后能夠減少后續(xù)的網(wǎng)絡(luò)傳輸開銷,加快查找過程速度。
因此,與以下這種方式相比:
HTable table1 = new?HTable("table1");
HTable table2 = new?HTable("table2");
下面的方式更有效些:
Configuration conf = HBaseConfiguration.create();
HTable table1 = new?HTable(conf, "table1");
HTable table2 = new?HTable(conf, "table2");
備注:即使是高負(fù)載的多線程程序,也并沒有發(fā)現(xiàn)因為共享Configuration而導(dǎo)致的性能問題;如果你的實際情況中不是如此,那么可以嘗試不共享Configuration。
HTablePool
HTablePool可以解決HTable存在的線程不安全問題,同時通過維護(hù)固定數(shù)量的HTable對象,能夠在程序運行期間復(fù)用這些HTable資源對象。
Configuration conf = HBaseConfiguration.create();
HTablePool pool = new?HTablePool(conf, 10);
1.?? HTablePool可以自動創(chuàng)建HTable對象,而且對客戶端來說使用上是完全透明的,可以避免多線程間數(shù)據(jù)并發(fā)修改問題。
2.?? HTablePool中的HTable對象之間是公用Configuration連接的,能夠可以減少網(wǎng)絡(luò)開銷。
HTablePool的使用很簡單:每次進(jìn)行操作前,通過HTablePool的getTable方法取得一個HTable對象,然后進(jìn)行put/get/scan/delete等操作,最后通過HTablePool的putTable方法將HTable對象放回到HTablePool中。
下面是個使用HTablePool的簡單例子:
?
public?void?createUser(String username, String firstName, String lastName, String email, String password, String roles) throws?IOException {
HTable table = rm.getTable(UserTable.NAME);
Put put = new?Put(Bytes.toBytes(username));
put.add(UserTable.DATA_FAMILY, UserTable.FIRSTNAME,
Bytes.toBytes(firstName));
put.add(UserTable.DATA_FAMILY, UserTable.LASTNAME,
Bytes.toBytes(lastName));
put.add(UserTable.DATA_FAMILY, UserTable.EMAIL, Bytes.toBytes(email));
put.add(UserTable.DATA_FAMILY, UserTable.CREDENTIALS,
Bytes.toBytes(password));
put.add(UserTable.DATA_FAMILY, UserTable.ROLES, Bytes.toBytes(roles));
table.put(put);
table.flushCommits();
rm.putTable(table);
}
?
?
Hbase和DBMS比較:
查詢數(shù)據(jù)不靈活:
?
總結(jié)
以上是生活随笔為你收集整理的HBase性能优化方法总结(三):读表操作的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: HBase性能优化方法总结(二):写表操
- 下一篇: 怎么对Java服务进行调优的?