HBase Replication源码解析之HLog读取
2019獨(dú)角獸企業(yè)重金招聘Python工程師標(biāo)準(zhǔn)>>>
在HRegionServer中兩個(gè)量和replication相關(guān),如下所示:
?
[java]?view plain?copy
在ReplicationSourceService中只有一個(gè)方法getWALActionsListener,該方法返回WALActionsListener。ReplicationSinkService同樣也是一個(gè)接口類,它有一個(gè)方法replicateLogEntries。在HRegionServer的如下代碼段中會(huì)啟動(dòng)replicationservice。
?
?
[java]?view plain?copy
startReplicationService中做了三件事,分別是調(diào)用ReplicationSourceManager的init方法,初始化replicationSink,初始化調(diào)度線程池scheduleThreadPool;
startReplicationService方法中調(diào)用了ReplicationSourceManager的init方法,init中遍歷replicationPeers中的peerid,并以該id為參數(shù),調(diào)用addSource方法,在addSource中針對(duì)每一個(gè)peerid構(gòu)造了一個(gè)對(duì)象ReplicationSource,ReplicationSource是個(gè)守護(hù)進(jìn)程,這里初始化的時(shí)候并不是通過構(gòu)造函數(shù),而是通過getReplicationSource函數(shù),在這個(gè)方法里先獲得了一個(gè)ReplicationSource的接口,接著調(diào)用init初始化該接口,此外,getReplicationSource還有一個(gè)重要的作用是它實(shí)例化了replicationEndpoint(HBaseInterClusterReplicationEndpoint)。回到addSource這個(gè)方法,它返回前調(diào)用了ReplicationSource的startup方法,startup是個(gè)挺有意思的方法,代碼如下:
?
ReplicationSource是個(gè)守護(hù)線程,在startUp中啟動(dòng)了自己。。。。這么說也就是replicationPeers中的每個(gè)peerid都表示了一個(gè)slave集群,而每個(gè)slave集群都有一個(gè)自己的ReplicationSource線程。現(xiàn)在的重點(diǎn)就落在了ReplicationSource這個(gè)守護(hù)線程的處理邏輯,可以從它的run方法入手分析。
?
run中有如下幾個(gè)關(guān)鍵步驟,首先:
? ? ? ? ? 1、啟動(dòng)replicationEndpoint :Service.State state = replicationEndpoint.start().get();
? ? ?? ? ?2、構(gòu)造walEntryFilter:this.walEntryFilter = new ChainWALEntryFilter(filters);
? ? ?? ? ?3、進(jìn)入一個(gè)循環(huán),循環(huán)持續(xù)運(yùn)行至守護(hù)線程ReplicationSource終止:
? ? ? ? ? ? ? ? ? ??? ? ?while(isActive) {
? ? ? ? ? ? ? ? ? ? ? ? ?? ? ?獲取log path;
? ? ? ? ? ? ? ? ? ? ? ? ?? ? ?調(diào)用openReader打開當(dāng)前path的log reader(后文詳解);
? ? ? ? ? ? ? ? ? ? ? ? ?? ? ?從reader中依次讀取WAL.Entry并放入一個(gè)List<WAL.Entry>的數(shù)據(jù)結(jié)構(gòu)中,方法調(diào)用如下:
? ? ? ? ? ? ? ? ? ? ? ? ? ? ??? ? ?readAllEntriesToReplicateOrNextFile(currentWALisBingWrittenTo, entries)
? ? ? ? ? ? ? ? ? ? ? ? ?? ? ?最后調(diào)用shipEdits將entries發(fā)送到遠(yuǎn)端集群;
? ? ? ? ? ? ? ? ? ? ?????}
?
發(fā)送WALEntry到從集群的邏輯在方法shipEdits中完成,ship方法接收一個(gè)List<WAL.Entry>類型的參數(shù)entries,在shipEdits中entries參數(shù)被包裝進(jìn)replicateContext中并發(fā)送到從集群,這部分的主要代碼如下所示:
還記得前文中說到,replicationEndpoint在getReplicationSource中初始化為HBaseInterClusterReplicationEndpoint類型的變量。進(jìn)入HBaseInterClusterReplicationEndpoint的replicate方法的實(shí)現(xiàn),該方法首先從參數(shù)replicateContext中獲得List<Entry> entries,關(guān)鍵的wal傳遞在下面這段代碼中:
其中最后一句將Entry對(duì)象序列化之后由文首R(shí)egionServer中初始化的ReplicationSinkService發(fā)送到遠(yuǎn)端集群;
以上這些就是大概的replication時(shí),wal跨集群傳遞的一些細(xì)節(jié)實(shí)現(xiàn)。接下來回過頭詳細(xì)解釋上文留下的一個(gè)小辮子,就是圍繞ReplicationSource的openReader方法的實(shí)現(xiàn),分析這個(gè)調(diào)用的目的是理清wal的讀邏輯是什么樣的。
?
ReplicationSource的openReader以currentPath為參數(shù),調(diào)用ReplicationWALReaderManager的openReader
?
ReplicationWALReaderManager的openReader通過WALFactory.createReader返回指定文件的reader;
?
看看WALFactory.createReader中的關(guān)鍵代碼吧:
?
?
可見Reader是在這里構(gòu)建的,我們以最常見的lrClass屬于ProtobufLogReader.class為例來解釋,首先初始化一個(gè)數(shù)據(jù)輸入流FSDataInputStream,通過這個(gè)流打開文件fs(fs在輸入?yún)?shù)中指定),根據(jù)isPbWal選擇new不同的Reader實(shí)例,最后調(diào)用reader的init方法完成初始化工作。這里的Reader大多數(shù)是DefaultWALProvider.Reader類型的。
?
Reader創(chuàng)建已經(jīng)分析完畢,那讀實(shí)現(xiàn)是什么樣的?
?
讀的動(dòng)作主要在readAllEntriesToReplicateOrNextFile中,該方法接收一個(gè)List<WAL.Entry>類型的參數(shù)entries,也就是說讀到的各個(gè)log entry在entries中返回,下面一一分析readAllEntriesToReplicateOrNextFile中的主要邏輯。
? ? ? ? ? 1、this.repLogReader.seek();
?? ? ? ? ?2、WAL.Entry entry = this.repLogReader.readNextAndSetPosition();
?? ? ? ? ?3、進(jìn)入循環(huán)
?? ? ? ? ? ? ? ? while(entry != null) {
?? ? ? ? ? ? ? ? ? ? ? //過濾掉已經(jīng)消費(fèi)掉的log entry
?? ? ? ? ? ? ? ? ? ? ?if (replicationEndpoint.canReplicateToSameCluster()
? ? ? ? ? ? ? ? ? ? ? ? ? || !entry.getKey().getClusterIds().contains(peerClusterId)) {
? ? ? ? ? ? ? ? ? ? ? ? ? entry = walEntryFilter.filter(entry); ?//過濾的邏輯在walEntryFilter中實(shí)現(xiàn)
? ? ? ? ? ? ? ? ? ? ? ? ? entries.add(entry);
? ? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? ? ? ? try {
?? ? ? ? ? ? ? ? ? ? ? ? ? ? ?entry = this.repLogReader.readNextAndSetPosition();
? ? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? ? }
?
?
? ? ?? ? ?4、各種metrics處理;
?
WALEntryFilter的作用是在把wal entries發(fā)送到slave集群前過濾掉某些并不需要的發(fā)送WAL Entries,它有很多個(gè)實(shí)現(xiàn)類,所有的類都實(shí)現(xiàn)了filter方法,這些不同的WALEntryFilter可以通過ChainWALEntryFilter構(gòu)成一條責(zé)任鏈。HLog文件讀出的wal entries流經(jīng)責(zé)任鏈,篩選出需要replicate的walEntry,這是典型的責(zé)任鏈模式的應(yīng)用。
轉(zhuǎn)載于:https://my.oschina.net/sniperLi/blog/910764
總結(jié)
以上是生活随笔為你收集整理的HBase Replication源码解析之HLog读取的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: [BZOJ 1076][SCOI2008
- 下一篇: 常用的 16 个 Sublime Tex