Hbase Compaction 源码分析 - CompactionChecker
其他相關文章
Hbase Compaction 源碼分析 - CompactionChecker
Hbase Compaction 源碼分析 - RatioBasedCompactionPolicy 策略
Hbase Compaction 源碼分析 - CompactSplitThread 線程池選擇
CompactionChecker
介紹:
RegionServer會在后臺啟動一個線程CompactionChecker,定期觸發檢查對應Store是否需要執行Compaction,檢查周期為hbase.server.thread.wakefrequency*hbase.server.compactchecker.interval.multiplier。和flush不同的是,該線程優先檢查Store中總文件數是否大于配置Compaction閾值hbase.hstore.compactionThreshold,一旦大于就會觸發Compaction;如果不滿足,接著檢查是否滿足Major Compaction條件。簡單來說,如果當前Store中HFile的最早更新時間早于某個值mcTime,就會觸發Major Compaction。mcTime是一個浮動值,浮動區間默認為[7-7 0.5,7+7*0.5],其中7為hbase.hregion.majorcompaction,0.5為hbase.hregion.majorcompaction.jitter,可見默認在7天左右就會執行一次Major Compaction。用戶如果想禁用Major Compaction,需要將參數hbase.hregion.majorcompaction設為0
源碼分析:
在 org.apache.hadoop.hbase.regionserver.HRegionServer 類中,有個 compactionChecker 變量,該變量類型實現 Runnable 接口,用做后臺獨立線程監測是否需要執行Compaction操作
/** Check for compactions requests.*/ScheduledChore compactionChecker;?CompactionChecker 類是HRegionServer內部類,CompactionChecker構造方法如下
CompactionChecker(final HRegionServer h, final int sleepTime,final Stoppable stopper) {super("CompactionChecker", stopper, sleepTime);this.instance = h;LOG.info(this.getName() + " runs every " + StringUtils.formatTime(sleepTime));/* MajorCompactPriority is configurable.* If not set, the compaction will use default priority.*///設置major合并優先級,取參數hbase.regionserver.compactionChecker.majorCompactPriority,// 默認為Integer.MAX_VALUEthis.majorCompactPriority = this.instance.conf.getInt("hbase.regionserver.compactionChecker.majorCompactPriority",DEFAULT_PRIORITY);}傳入三個參數, 第一個是HRegionServer,第二個是休眠時間,第三個是是否停止(如果RegionServer停止運行,CompactionChecker會監控到,并停止Compaction)
同時調用父類方法
super("CompactionChecker", stopper, sleepTime);我們看下父類方法 實現了Runnable接口,這里我們直接看run方法具體運行方法
我們發現第一次運行會初始化執行initialChore()方法,該方法值有retrun true 不做任何處理,之后每次都會運行chore()方法,該方法在CompactionChecker類中實現
public abstract class ScheduledChore implements Runnable @Overridepublic void run() {updateTimeTrackingBeforeRun();if (missedStartTime() && isScheduled()) {onChoreMissedStartTime();if (LOG.isInfoEnabled()) LOG.info("Chore: " + getName() + " missed its start time");} else if (stopper.isStopped() || !isScheduled()) {cancel(false);cleanup();if (LOG.isInfoEnabled()) LOG.info("Chore: " + getName() + " was stopped");} else {try {if (!initialChoreComplete) {initialChoreComplete = initialChore();} else {chore();}} catch (Throwable t) {if (LOG.isErrorEnabled()) LOG.error("Caught error", t);if (this.stopper.isStopped()) {cancel(false);cleanup();}}}}CompactionChecker類chore?
查看CompactionChecker類chore方法
這里主要就是調用相關策略的方法,判斷是否需要Compaction,具體策略在下面介紹
private static class CompactionChecker extends ScheduledChore {private final HRegionServer instance;private final int majorCompactPriority;private final static int DEFAULT_PRIORITY = Integer.MAX_VALUE;//Iteration is 1-based rather than 0-based so we don't check for compaction// immediately upon region server startupprivate long iteration = 1;//sleepTime上面調用傳入的是:hbase.server.thread.wakefrequency=10 * 1000CompactionChecker(final HRegionServer h, final int sleepTime,final Stoppable stopper) {super("CompactionChecker", stopper, sleepTime);this.instance = h;LOG.info(this.getName() + " runs every " + StringUtils.formatTime(sleepTime));/* MajorCompactPriority is configurable.* If not set, the compaction will use default priority.*///設置major合并優先級,取參數hbase.regionserver.compactionChecker.majorCompactPriority,// 默認為Integer.MAX_VALUEthis.majorCompactPriority = this.instance.conf.getInt("hbase.regionserver.compactionChecker.majorCompactPriority",DEFAULT_PRIORITY);}@Overrideprotected void chore() {//onlineRegions.values() 是所有RegionServer中活躍的Region集合for (Region r : this.instance.onlineRegions.values()) {if (r == null)continue;//r.getStores 獲取region中所有Store(一個Region有幾個列簇就有幾個Store)for (Store s : r.getStores()) {try {//multiplier = hbase.server.compactchecker.interval.multiplier的值//該值獲取方法在 HStore 的構造函數中初始化 默認1000long multiplier = s.getCompactionCheckMultiplier();//斷言是否為正常值assert multiplier > 0;// iteration該值初始化為1,每次定時執行該值會+1,當為multiplier的整數倍時會往下執行//我們上面獲取到的multiplier=1000,chore定期執行,每隔 hbase.server.thread.wakefrequency=10秒 默認 10 * 1000//也就是每隔10s*1000=10000s=2.77小時,會往下執行一次if (iteration % multiplier != 0) continue;//需要合并的話,發起SystemCompaction請求,// 此處最終比較的是是否當前storefile數量減去正在compacting的文件數大于設置的compact min值(這里看的是RatioBasedCompactionPolicy策略的needsCompaction方法)// 若滿足則執行systemcompactif (s.needsCompaction()) {// Queue a compaction. Will recognize if major is needed.this.instance.compactSplitThread.requestSystemCompaction(r, s, getName()+ " requests compaction");} else if (s.isMajorCompaction()) {//判斷是否需要執行Major Compactions.triggerMajorCompaction();if (majorCompactPriority == DEFAULT_PRIORITY|| majorCompactPriority > ((HRegion)r).getCompactPriority()) {this.instance.compactSplitThread.requestCompaction(r, s, getName()+ " requests major compaction; use default priority", null);} else {this.instance.compactSplitThread.requestCompaction(r, s, getName()+ " requests major compaction; use configured priority",this.majorCompactPriority, null, null);}}} catch (IOException e) {LOG.warn("Failed major compaction check on " + r, e);}}}iteration = (iteration == Long.MAX_VALUE) ? 0 : (iteration + 1);}}needsCompaction 方法?
?
@Overridepublic boolean needsCompaction(final Collection<StoreFile> storeFiles,final List<StoreFile> filesCompacting) {//當前storeFiles數量-正在compact的文件數量,是否大于minFilesToCompact //minFilesToCompact = Math.max(2, conf.getInt(HBASE_HSTORE_COMPACTION_MIN_KEY,// /*old name*/ conf.getInt("hbase.hstore.compactionThreshold", 3)));//如果待compaction文件數量大于配置,則返回true,進行compactionint numCandidates = storeFiles.size() - filesCompacting.size();return numCandidates >= comConf.getMinFilesToCompact();}isMajorCompaction 方法
可以看到調用的是 storeEngine.getCompactionPolicy() 的 shouldPerformMajorCompaction方法storeEngine.getCompactionPolicy() 獲取到執行的策略,然后調用該策略的 shouldPerformMajorCompaction 方法,這里分析的是 RatioBasedCompactionPolicy 策略,
@Overridepublic boolean isMajorCompaction() throws IOException {for (StoreFile sf : this.storeEngine.getStoreFileManager().getStorefiles()) {// TODO: what are these reader checks all over the place?if (sf.getReader() == null) {LOG.debug("StoreFile " + sf + " has null Reader");return false;}}return storeEngine.getCompactionPolicy().shouldPerformMajorCompaction(this.storeEngine.getStoreFileManager().getStorefiles());}shouldPerformMajorCompaction 方法
該方法返回是否需要Compaction
這里查看的是hbase 1.4.10 版本源碼,該方法存在一個bug,倒數第二行的 result = true;會導致其中的一種判斷失效,后來去查看了下 1.4.13 版本以后源碼,已經修復該問題。
@Overridepublic boolean shouldPerformMajorCompaction(final Collection<StoreFile> filesToCompact)throws IOException {boolean result = false;//獲取下一次major compact的時間 ,該值是一個浮動值 [7-7*0.5,7+7.0.5]// hbase.hregion.majorcompaction = 7天// hbase.hregion.majorcompaction.jitter = 0.5long mcTime = getNextMajorCompactTime(filesToCompact);if (filesToCompact == null || filesToCompact.isEmpty() || mcTime == 0) {return result;}// TODO: Use better method for determining stamp of last major (HBASE-2990)//獲取待合并文件中的修改時間最小的那個long lowTimestamp = StoreUtils.getLowestTimestamp(filesToCompact);//獲取當前時間long now = EnvironmentEdgeManager.currentTime();//判斷上次修改時間,是否在本次修改時間范圍內,如果最早caption時間大于mcTime天前,// 即在mcTime時間內執行過,則不運行Majorif (lowTimestamp > 0L && lowTimestamp < (now - mcTime)) {//到了這一步就肯定會執行Major Compaction,后面判斷,基本就是Debug時候使用String regionInfo;if (this.storeConfigInfo != null && this.storeConfigInfo instanceof HStore) {regionInfo = ((HStore)this.storeConfigInfo).getRegionInfo().getRegionNameAsString();} else {regionInfo = this.toString();}// Major compaction time has elapsed.long cfTTL = HConstants.FOREVER;if (this.storeConfigInfo != null) {//獲取文件保存時間ttlcfTTL = this.storeConfigInfo.getStoreFileTtl();}if (filesToCompact.size() == 1) {//合并文件為1個// Single fileStoreFile sf = filesToCompact.iterator().next();//文件最小時間戳Long minTimestamp = sf.getMinimumTimestamp();//文件存在時間long oldest = (minTimestamp == null) ? Long.MIN_VALUE : now - minTimestamp.longValue();if (sf.isMajorCompaction() && (cfTTL == Long.MAX_VALUE || oldest < cfTTL)) {//文件未過期float blockLocalityIndex =sf.getHDFSBlockDistribution().getBlockLocalityIndex(RSRpcServices.getHostname(comConf.conf, false));if (blockLocalityIndex < comConf.getMinLocalityToForceCompact()) {//判斷文件是否本地化,如果未本地化則進行CompactionLOG.debug("Major compaction triggered on only store " + regionInfo+ "; to make hdfs blocks local, current blockLocalityIndex is "+ blockLocalityIndex + " (min " + comConf.getMinLocalityToForceCompact() + ")");result = true;} else {//跳過壓縮LOG.debug("Skipping major compaction of " + regionInfo+ " because one (major) compacted file only, oldestTime " + oldest+ "ms is < TTL=" + cfTTL + " and blockLocalityIndex is " + blockLocalityIndex+ " (min " + comConf.getMinLocalityToForceCompact() + ")");}} else if (cfTTL != HConstants.FOREVER && oldest > cfTTL) {//storeFile過期觸發Major CompactionLOG.debug("Major compaction triggered on store " + regionInfo+ ", because keyvalues outdated; time since last major compaction "+ (now - lowTimestamp) + "ms");result = true;}} else {//如果合并文件為多個則返回trueLOG.debug("Major compaction triggered on store " + regionInfo+ "; time since last major compaction " + (now - lowTimestamp) + "ms");}result = true;//該處存在bug}return result;}?這里返回的result的就是? s.isMajorCompaction() 返回的結果
總結
以上是生活随笔為你收集整理的Hbase Compaction 源码分析 - CompactionChecker的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: C++ 中export 关键字的尴尬处境
- 下一篇: Hbase二级索引+CDH+Lily