Hbase 预写日志WAL处理源码分析之 LogCleaner
目錄
Hlog? WALs和oldWALs?
整體流程
HMaster 初始化
定時執行?
LogCleaner?日志清理類
ReplicationLogCleaner 日志清理類
總結
Hlog? WALs和oldWALs?
這里先介紹一下Hlog失效和Hlog刪除的規則
HLog失效:寫入數據一旦從MemStore中刷新到磁盤,HLog(默認存儲目錄在/hbase/WALs下)就會自動把數據移動到 /hbase/oldWALs?目錄下,此時并不會刪除
Hlog刪除:Master啟動時會啟動一個線程,定期去檢查oldWALs目錄下的可刪除文件進行刪除,定期檢查時間為 hbase.master.cleaner.interval ,默認是1分鐘 ,刪除條件有兩個:
? ? ? ? 1.Hlog文件在參與主從復制,否的話刪除,是的話不刪除
? ? ? ? 2.Hlog文件是否在目錄中存在 hbase.master.logcleaner.ttl?時間,如果是則刪除
整體流程
pos?格式流程圖下載地址:
鏈接:https://pan.baidu.com/s/1szhpVn7RyegE0yqQedACIA?
提取碼:ig9x
這里只介紹與wal相關的流程,一下介紹的代碼都在上圖中標記類名,方法名,以及說明,可以直接從源碼中查看
HMaster 初始化
HMaster啟動初始化 ,HMaster構造方法調用? startActiveMasterManager?方法
startActiveMasterManager?方法?調用? finishActiveMasterInitialization(status);?方法
在?finishActiveMasterInitialization?方法中會啟動所有服務線程,代碼段如下
// start up all service threads. status.setStatus("Initializing master service threads"); startServiceThreads();?startServiceThreads?方法代碼如下,
/** Start up all services. If any of these threads gets an unhandled exception* then they just die with a logged message. This should be fine because* in general, we do not expect the master to get such unhandled exceptions* as OOMEs; it should be lightly loaded. See what HRegionServer does if* need to install an unexpected exception handler.*/private void startServiceThreads() throws IOException{// Start the executor service poolsthis.service.startExecutorService(ExecutorType.MASTER_OPEN_REGION,conf.getInt("hbase.master.executor.openregion.threads", 5));this.service.startExecutorService(ExecutorType.MASTER_CLOSE_REGION,conf.getInt("hbase.master.executor.closeregion.threads", 5));this.service.startExecutorService(ExecutorType.MASTER_SERVER_OPERATIONS,conf.getInt("hbase.master.executor.serverops.threads", 5));this.service.startExecutorService(ExecutorType.MASTER_META_SERVER_OPERATIONS,conf.getInt("hbase.master.executor.serverops.threads", 5));this.service.startExecutorService(ExecutorType.M_LOG_REPLAY_OPS,conf.getInt("hbase.master.executor.logreplayops.threads", 10));// We depend on there being only one instance of this executor running// at a time. To do concurrency, would need fencing of enable/disable of// tables.// Any time changing this maxThreads to > 1, pls see the comment at// AccessController#postCreateTableHandlerthis.service.startExecutorService(ExecutorType.MASTER_TABLE_OPERATIONS, 1);startProcedureExecutor();// Initial cleaner choreCleanerChore.initChorePool(conf);// Start log cleaner thread//獲取定時日志清理時間,從系統配置獲取,默認為10分鐘int cleanerInterval = conf.getInt("hbase.master.cleaner.interval", 60 * 1000);this.logCleaner =new LogCleaner(cleanerInterval,this, conf, getMasterFileSystem().getOldLogDir().getFileSystem(conf),getMasterFileSystem().getOldLogDir());//將任務加入定時執行,時間間隔為 cleanerInterval ,該值在LogCleaner中已經設置為定時執行間隔getChoreService().scheduleChore(logCleaner);//start the hfile archive cleaner threadPath archiveDir = HFileArchiveUtil.getArchivePath(conf);Map<String, Object> params = new HashMap<String, Object>();params.put(MASTER, this);this.hfileCleaner = new HFileCleaner(cleanerInterval, this, conf, getMasterFileSystem().getFileSystem(), archiveDir, params);getChoreService().scheduleChore(hfileCleaner);serviceStarted = true;if (LOG.isTraceEnabled()) {LOG.trace("Started service threads");}if (!conf.getBoolean(HConstants.ZOOKEEPER_USEMULTI, true)) {try {replicationZKLockCleanerChore = new ReplicationZKLockCleanerChore(this, this,cleanerInterval, this.getZooKeeper(), this.conf);getChoreService().scheduleChore(replicationZKLockCleanerChore);} catch (Exception e) {LOG.error("start replicationZKLockCleanerChore failed", e);}}try {replicationZKNodeCleanerChore = new ReplicationZKNodeCleanerChore(this, cleanerInterval,new ReplicationZKNodeCleaner(this.conf, this.getZooKeeper(), this));getChoreService().scheduleChore(replicationZKNodeCleanerChore);} catch (Exception e) {LOG.error("start replicationZKNodeCleanerChore failed", e);}}定時執行?
其中這段代碼是對我們HLog進行處理,并加入調度定時執行
// Initial cleaner choreCleanerChore.initChorePool(conf);// Start log cleaner thread//獲取定時日志清理時間,從系統配置獲取,默認為10分鐘int cleanerInterval = conf.getInt("hbase.master.cleaner.interval", 60 * 1000);this.logCleaner =new LogCleaner(cleanerInterval,this, conf, getMasterFileSystem().getOldLogDir().getFileSystem(conf),getMasterFileSystem().getOldLogDir());//將任務加入定時執行,時間間隔為 cleanerInterval ,該值在LogCleaner中已經設置為定時執行間隔getChoreService().scheduleChore(logCleaner);??加入調度后會周期性執行 LogCleaner.chore()?方法(在父類CleanerChore中)
@Overrideprotected void chore() {if (getEnabled()) {try {POOL.latchCountUp();if (runCleaner()) {if (LOG.isTraceEnabled()) {LOG.trace("Cleaned all WALs under " + oldFileDir);}} else {if (LOG.isTraceEnabled()) {LOG.trace("WALs outstanding under " + oldFileDir);}}} finally {POOL.latchCountDown();}// After each cleaner chore, checks if received reconfigure notification while cleaning.// First in cleaner turns off notification, to avoid another cleaner updating pool again.if (POOL.reconfigNotification.compareAndSet(true, false)) {// This cleaner is waiting for other cleaners finishing their jobs.// To avoid missing next chore, only wait 0.8 * period, then shutdown.POOL.updatePool((long) (0.8 * getTimeUnit().toMillis(getPeriod())));}} else {LOG.trace("Cleaner chore disabled! Not cleaning.");}}上面代碼中的runCleaner()方法就是將我們CleanerTask加入任務隊列中
public Boolean runCleaner() {CleanerTask task = new CleanerTask(this.oldFileDir, true);POOL.submit(task);return task.join();}LogCleaner?日志清理類
?LogCleaner類是清理日志數據,LogCleaner 父類?CleanerChore?類中的?私有類CleanerTask(該類繼承RecursiveTask類,不做過多介紹,想了解的可以百度?ForkJoinTask ), 的?compute()方法是定時清理的關鍵,這里獲取了所有oldWALs目錄下的文件,并進行選擇性刪除
@Overrideprotected Boolean compute() {LOG.trace("Cleaning under " + dir);List<FileStatus> subDirs;List<FileStatus> tmpFiles;final List<FileStatus> files;try {// if dir doesn't exist, we'll get null back for both of these// which will fall through to succeeding.subDirs = FSUtils.listStatusWithStatusFilter(fs, dir, new FileStatusFilter() {@Overridepublic boolean accept(FileStatus f) {return f.isDirectory();}});if (subDirs == null) {subDirs = Collections.emptyList();}//獲取oldWALs目錄下文件tmpFiles = FSUtils.listStatusWithStatusFilter(fs, dir, new FileStatusFilter() {@Overridepublic boolean accept(FileStatus f) {return f.isFile();}});files = tmpFiles == null ? Collections.<FileStatus>emptyList() : tmpFiles;} catch (IOException ioe) {LOG.warn("failed to get FileStatus for contents of '" + dir + "'", ioe);return false;}boolean allFilesDeleted = true;if (!files.isEmpty()) {allFilesDeleted = deleteAction(new Action<Boolean>() {@Overridepublic Boolean act() throws IOException {//files 是oldWALs目錄下所有文件return checkAndDeleteFiles(files);}}, "files");}boolean allSubdirsDeleted = true;if (!subDirs.isEmpty()) {final List<CleanerTask> tasks = Lists.newArrayListWithCapacity(subDirs.size());for (FileStatus subdir : subDirs) {CleanerTask task = new CleanerTask(subdir, false);tasks.add(task);//任務task.fork();}allSubdirsDeleted = deleteAction(new Action<Boolean>() {@Overridepublic Boolean act() throws IOException {return getCleanResult(tasks);}}, "subdirs");}boolean result = allFilesDeleted && allSubdirsDeleted;// if and only if files and subdirs under current dir are deleted successfully, and// it is not the root dir, then task will try to delete it.if (result && !root) {result &= deleteAction(new Action<Boolean>() {@Overridepublic Boolean act() throws IOException {return fs.delete(dir, false);}}, "dir");}return result;}?
?上一步中調用了 checkAndDeleteFiles(files) 方法,該方法的作用是:通過每個清理程序運行給定的文件,以查看是否應刪除該文件,并在必要時將其刪除。輸入參數是所有oldWALs目錄下的文件
/*** Run the given files through each of the cleaners to see if it should be deleted, deleting it if* necessary.* 通過每個清理程序運行給定的文件,以查看是否應刪除該文件,并在必要時將其刪除。* @param files List of FileStatus for the files to check (and possibly delete)* @return true iff successfully deleted all files*/private boolean checkAndDeleteFiles(List<FileStatus> files) {if (files == null) {return true;}// first check to see if the path is validList<FileStatus> validFiles = Lists.newArrayListWithCapacity(files.size());List<FileStatus> invalidFiles = Lists.newArrayList();for (FileStatus file : files) {if (validate(file.getPath())) {validFiles.add(file);} else {LOG.warn("Found a wrongly formatted file: " + file.getPath() + " - will delete it.");invalidFiles.add(file);}}Iterable<FileStatus> deletableValidFiles = validFiles;// check each of the cleaners for the valid filesfor (T cleaner : cleanersChain) {if (cleaner.isStopped() || getStopper().isStopped()) {LOG.warn("A file cleaner" + this.getName() + " is stopped, won't delete any more files in:"+ this.oldFileDir);return false;}Iterable<FileStatus> filteredFiles = cleaner.getDeletableFiles(deletableValidFiles);// trace which cleaner is holding on to each fileif (LOG.isTraceEnabled()) {ImmutableSet<FileStatus> filteredFileSet = ImmutableSet.copyOf(filteredFiles);for (FileStatus file : deletableValidFiles) {if (!filteredFileSet.contains(file)) {LOG.trace(file.getPath() + " is not deletable according to:" + cleaner);}}}deletableValidFiles = filteredFiles;}Iterable<FileStatus> filesToDelete = Iterables.concat(invalidFiles, deletableValidFiles);return deleteFiles(filesToDelete) == files.size();}ReplicationLogCleaner 日志清理類
checkAndDeleteFiles方法中?又調用了?cleaner.getDeletableFiles(deletableValidFiles) ,getDeletableFiles方法在ReplicationLogCleaner類下,是判斷哪些文件該刪除,哪些不該刪除,刪除條件就是文章開頭提出的是否在參與復制中,如果在參與則不刪除,不在則刪除。
注:所有在參與peer的數據都在?zookeeper?中?/hbase/replication/rs?目錄下存儲
比如在zookeeper目錄下有這么個節點
/hbase/replication/rs/jast.zh,16020,1576397142865/Indexer_account_indexer_prd/jast.zh%2C16020%2C1576397142865.jast.zh%2C16020%2C1576397142865.regiongroup-0.1579283025645?那么我們再oldWALs目錄下是不會刪除掉這個數據的
[jast@jast002 ~]$ hdfs dfs -du -h /hbase/oldWALs/jast015.zh%2C16020%2C1576397142865.jast015.zh%2C16020%2C1576397142865.regiongroup-0.1579283025645 256.0 M 512.0 M /hbase/oldWALs/jast015.zh%2C16020%2C1576397142865.jast015.zh%2C16020%2C1576397142865.regiongroup-0.1579283025645 @Overridepublic Iterable<FileStatus> getDeletableFiles(Iterable<FileStatus> files) {// all members of this class are null if replication is disabled,// so we cannot filter the filesif (this.getConf() == null) {return files;}final Set<String> wals;try {// The concurrently created new WALs may not be included in the return list,// but they won't be deleted because they're not in the checking set.wals = loadWALsFromQueues();} catch (KeeperException e) {LOG.warn("Failed to read zookeeper, skipping checking deletable files");return Collections.emptyList();}return Iterables.filter(files, new Predicate<FileStatus>() {@Overridepublic boolean apply(FileStatus file) {String wal = file.getPath().getName();//包含文件則保留,不包含則刪除boolean logInReplicationQueue = wals.contains(wal);if (LOG.isDebugEnabled()) {if (logInReplicationQueue) {//包含文件保留LOG.debug("Found log in ZK, keeping: " + wal);} else {//不包含刪除LOG.debug("Didn't find this log in ZK, deleting: " + wal);}}return !logInReplicationQueue;}});}上一步調用了?loadWALsFromQueues?方法,該方法作用是:獲取所有在復制隊列中的wals文件,并返回,
/*** Load all wals in all replication queues from ZK. This method guarantees to return a* snapshot which contains all WALs in the zookeeper at the start of this call even there* is concurrent queue failover. However, some newly created WALs during the call may* not be included.** 從ZK加載所有復制隊列中的所有wals。 即使存在并發隊列故障轉移,* 此方法也保證在此調用開始時返回包含zookeeper中所有WAL的快照。* 但是,可能不會包括通話過程中一些新創建的WAL。*/private Set<String> loadWALsFromQueues() throws KeeperException {for (int retry = 0; ; retry++) {int v0 = replicationQueues.getQueuesZNodeCversion();List<String> rss = replicationQueues.getListOfReplicators();if (rss == null || rss.isEmpty()) {LOG.debug("Didn't find any region server that replicates, won't prevent any deletions.");return ImmutableSet.of();}Set<String> wals = Sets.newHashSet();for (String rs : rss) {//加載zookeeper下,/hbase/replication/rs 目錄下所有數據List<String> listOfPeers = replicationQueues.getAllQueues(rs);// if rs just died, this will be nullif (listOfPeers == null) {continue;}//加載所有目錄for (String id : listOfPeers) {List<String> peersWals = replicationQueues.getLogsInQueue(rs, id);if (peersWals != null) {wals.addAll(peersWals);}}}int v1 = replicationQueues.getQueuesZNodeCversion();if (v0 == v1) {return wals;}LOG.info(String.format("Replication queue node cversion changed from %d to %d, retry = %d",v0, v1, retry));}}總結
至此我們可以發現,刪除的過程就是定期執行刪除文件線程,從oldWALs獲取所有文件,如果在peer復制隊列中則不進行副本刪除,否則則刪除
總結
以上是生活随笔為你收集整理的Hbase 预写日志WAL处理源码分析之 LogCleaner的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: OpenResty 安装,收集日志保存到
- 下一篇: PCL “(”:“::”右边的非法标记