Hadoop 副本存储策略的源码修改和设置
Table of Contents
- BlockPlacementPolicy
- Hadoop 提供的 BlockPlacementPolicy 實現
- BlockPlacementPolicyDefault 源碼閱讀
- 首先
- 處理favoredNodes
- 三副本選擇
- 再到具體的選擇
- 源碼閱讀的幾個注意
- 修改HDFS默認的副本放置機制
- RackAwareness 機架感知
大多數的叫法都是副本放置策略,實質上是HDFS對所有數據的位置放置策略,并非只是針對數據的副本。因此Hadoop的源碼里有block replicator(configuration)、 BlockPlacementPolicy(具體邏輯源碼)兩種叫法。
主要用途:上傳文件時決定文件在HDFS上存儲的位置(具體到datanode上的具體存儲介質,如具體到存儲在哪塊硬盤);rebalance、datanode退出集群、副本數量更改等導致數據移動的操作中,數據移動的具體位置。
BlockPlacementPolicy
BlockPlacementPolicy 作為虛基類提供了基本的接口,具體的子類重點實現下面?選擇副本?、?驗證副本放置是否滿足要求?、?選擇能夠刪除的副本?三個函數:
/*** 核心的副本放置策略實現,返回副本放置數量的存儲位置* **如果有效節點數量不夠(少于副本數),返回盡可能多的節點,而非失敗**** @param srcPath 上傳文件的路徑* @param numOfReplicas 除下面chosen參數里已經選擇的datanode,還需要的副本數量* @param writer 寫數據的機器, null if not in the cluster. 一般用于放置第一個副本以降低網絡通信* @param chosen 已經選擇的節點* @param returnChosenNodes 返回結果里是否包含chosen的datanode* @param excludedNodes 不選的節點* @param blocksize 塊大小* @return 排序好的選擇結果*/public abstract DatanodeStorageInfo[] chooseTarget(String srcPath,int numOfReplicas,Node writer,List<DatanodeStorageInfo> chosen,boolean returnChosenNodes,Set<Node> excludedNodes,long blocksize,BlockStoragePolicy storagePolicy);/*** 判斷傳入的放置方式是否符合要求*/abstract public BlockPlacementStatus verifyBlockPlacement(DatanodeInfo[] locs, int numOfReplicas);/*** 當副本數量較多時,選擇需要刪除的節點*/abstract public List<DatanodeStorageInfo> chooseReplicasToDelete(Collection<DatanodeStorageInfo> candidates, int expectedNumOfReplicas,List<StorageType> excessTypes, DatanodeDescriptor addedNode,DatanodeDescriptor delNodeHint);Hadoop 提供的 BlockPlacementPolicy 實現
Hadoop提供了BlockPlacementPolicyDefault、BlockPlacementPolicyWithNodeGroup、AvailableSpaceBlockPlacementPolicy三種實現(hadoop 2.7.7)。
其中BlockPlacementPolicyDefault是默認三副本策略的實現:第一個副本盡可能放在寫入數據的節點,第二個副本放在與第一個副本不在同一機架(rack)下的節點,第三個副本與第二副本放在同一個機架。
BlockPlacementPolicyWithNodeGroup中第一、二個副本和Default副本放置相同,第三個副本在第二個機架下選擇不同node group的結點。AvailableSpaceBlockPlacementPolicy實現存儲平衡。Hadoop3.1中還加入了BlockPlacementPolicyRackFaultTolerant將數據存儲到更多的機架下,BlockPlacementPolicyWithUpgradeDomain使用默認的副本放置策略,但是3個副本選擇的datanode都要有不同的upgrade domains(為了方便大集群中datanode的更新和重啟、將結點分配給不同的upgrade domain)。
通過改變dfs.block.replicator.classname?能夠選擇具體的實現類,默認值為org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyDefault。(Hadoop 2.7.7下,貌似不同版本的Hadoop的命名還不一樣,而且2.7.7默認的配置文件里還沒有,需要在源碼中查)
BlockPlacementPolicyDefault 源碼閱讀
public abstract DatanodeStorageInfo[] chooseTarget(String srcPath,int numOfReplicas,Node writer,List<DatanodeStorageInfo> chosen,boolean returnChosenNodes,Set<Node> excludedNodes,long blocksize,BlockStoragePolicy storagePolicy);chooseTarget函數實現了具體的三副本策略。各種特殊情況(如只有1個副本、datanode數量不夠、集群拓撲不滿足要求等)的考慮讓代碼看起來比較復雜,常規情況直接跟著調試代碼走會跳過很多異常處理部分,便于裂解正常流程。
在副本的選擇上用了各種帶chooseTarget函數,注意有幾個函數結果是通過參數傳出而不是返回值。
主要實現思路:
首先
srcPath沒有被考慮,被直接舍棄:
return chooseTarget(numOfReplicas, writer, chosenNodes, returnChosenNodes,excludedNodes, blocksize, storagePolicy, flags); // ignore srcPath因此默認的副本放置策略,在同一文件包含多個block時,每個block的存儲位置獨立考慮,并非存儲在同一datanode。
處理favoredNodes
上傳文件時可以指定favoredNodes(默認為空),首先對favoredNodes所在的節點判斷是否合適。如果滿足條件的節點數還低于副本數,則添加新的副本。
// --------------Choose favored nodes ---------------// 從favored nodes中選擇,在上傳文件時可以指定List<DatanodeStorageInfo> results = new ArrayList<>();boolean avoidStaleNodes = stats != null&& stats.isAvoidingStaleDataNodesForWrite();int maxNodesAndReplicas[] = getMaxNodesPerRack(0, numOfReplicas);numOfReplicas = maxNodesAndReplicas[0];int maxNodesPerRack = maxNodesAndReplicas[1];chooseFavouredNodes(src, numOfReplicas, favoredNodes,favoriteAndExcludedNodes, blocksize, maxNodesPerRack, results,avoidStaleNodes, storageTypes);// ---------------如果滿足要求的favored nodes數量不足-----------if (results.size() < numOfReplicas) {// Not enough favored nodes, choose other nodes, based on block// placement policy (HDFS-9393).numOfReplicas -= results.size();for (DatanodeStorageInfo storage : results) {// add localMachine and related nodes to favoriteAndExcludedNodesaddToExcludedNodes(storage.getDatanodeDescriptor(),favoriteAndExcludedNodes);}DatanodeStorageInfo[] remainingTargets =chooseTarget(src, numOfReplicas, writer,new ArrayList<DatanodeStorageInfo>(numOfReplicas), false,favoriteAndExcludedNodes, blocksize, storagePolicy, flags);for (int i = 0; i < remainingTargets.length; i++) {results.add(remainingTargets[i]);}}三副本選擇
實現邏輯在 chooseTargetInOrder(…) 函數中
// 第一個副本的選擇 if (numOfResults == 0) {writer = chooseLocalStorage(writer, excludedNodes, blocksize,maxNodesPerRack, results, avoidStaleNodes, storageTypes, true).getDatanodeDescriptor();if (--numOfReplicas == 0) {return writer;} }// 選擇與第一個副本不在同一Rack下的第二個副本 final DatanodeDescriptor dn0 = results.get(0).getDatanodeDescriptor(); if (numOfResults <= 1) {chooseRemoteRack(1, dn0, excludedNodes, blocksize, maxNodesPerRack,results, avoidStaleNodes, storageTypes);if (--numOfReplicas == 0) {return writer;} }// 第三個副本 if (numOfResults <= 2) {final DatanodeDescriptor dn1 = results.get(1).getDatanodeDescriptor();// 第一、二副本在同一Rack下時選第三個副本 // (前面的favoredNodes以及集群條件可能造成這種情況)if (clusterMap.isOnSameRack(dn0, dn1)) {chooseRemoteRack(1, dn0, excludedNodes, blocksize, maxNodesPerRack,results, avoidStaleNodes, storageTypes);} else if (newBlock){ // 正常情況,第二副本的localRack下選第三副本chooseLocalRack(dn1, excludedNodes, blocksize, maxNodesPerRack,results, avoidStaleNodes, storageTypes);} else { // 其它的以外chooseLocalRack(writer, excludedNodes, blocksize, maxNodesPerRack,results, avoidStaleNodes, storageTypes);}if (--numOfReplicas == 0) {return writer;} }// 如果副本數量還沒到0,剩下的副本隨機選擇 chooseRandom(numOfReplicas, NodeBase.ROOT, excludedNodes, blocksize,maxNodesPerRack, results, avoidStaleNodes, storageTypes); return writer;再到具體的選擇
選擇具體的存儲位置被上面包裝到了 chooseRemoteRack 和 chooseLocalRack 兩個函數。
實際調用時只是 chooseRandom 函數,在限定的rack下選擇一個隨機的節點。
源碼閱讀的幾個注意
代碼在直接閱讀時各種跳,但主線思路比較明確。主要帶來閱讀困難的位置:
修改HDFS默認的副本放置機制
可以選擇直接復制或繼承BlockPlacementPolicyDefault的實現,或者直接繼承BlockPlacementPolicy類編寫對應的接口具體實現。
將編譯好的jar包放入$HADOOP_PREFIX/share/hadoop/common下(或者其它的Hadoop jar包路徑)。
改變dfs.block.replicator.classname?為上面的實現類,要帶包的名稱。
RackAwareness 機架感知
Hadoop 并不能自動檢測集群的機架狀態,而是要預先設置機架的狀態,通過腳本或java類將datanode的ip轉換成具體的機架上的位置。
官方文檔介紹了基本思路,雖然實現上介紹得不是太清楚,只要將輸入的ip轉換成"/rackNum"的形式即可。
https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/RackAwareness.html
?Categories:cloud computing
?Tags:hadoop
總結
以上是生活随笔為你收集整理的Hadoop 副本存储策略的源码修改和设置的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 第五章I/O管理
- 下一篇: Python3之logging输出写入日