hadoop fsck命令分析 + 源码解析
fsck命令分析
HDFS支持fsck命令用以檢查各種不一致。fsck用以報(bào)告各種文件問題,如block丟失或缺少block等。fsck命令用法如下:
$HADOOP_HOME/bin/ hdfs fsck <path>
[-list-corruptfileblocks |
[-move | -delete | -openforwrite]
[-files [-blocks [-locations | -racks | -replicaDetails | -upgradedomains]]]
[-includeSnapshots] [-showprogress]
[-storagepolicies] [-maintenance]
[-blockId <blk_Id>] [-replicate]
<path> 檢查的起始目錄
-move 將損壞的文件移動(dòng)到/lost+found下面
-delete 刪除損壞的文件
-openforwrite 打印出正在寫的文件
-files 打印出所有被檢查的文件
-blocks 打印出block報(bào)告
-locations 打印出每個(gè)block的位置
-racks 打印出datanode的網(wǎng)絡(luò)拓?fù)浣Y(jié)構(gòu)
默認(rèn)情況下,fsck會(huì)忽略正在寫的文件,使用-openforwrite選項(xiàng)可以匯報(bào)這種文件。
官網(wǎng)對(duì)fsck命令的介紹如下:
一個(gè)fsck命令運(yùn)行的結(jié)果示例如下:
[hadoop@master1 ~]$ hadoop-current/bin/hdfs fsck hdfs://mycluster/test/tail.txt -files -blocks -locationsConnecting to namenode via http://master1:50070/fsck?ugi=hadoop&files=1&blocks=1&locations=1&path=%2Ftest%2Ftail.txt FSCK started by hadoop(null) (auth:SIMPLE) from /XX.XX.XX.XX for path /test/tail.txt at Thu Aug 26 15:28:38 CST 2021 /test/tail.txt 27 bytes, replicated: replication=3, 1 block(s): Under replicated BP-465154060-10.96.83.87-1627619151215:blk_1073741841_1017. Target Replicas is 3 but found 2 live replica(s), 0 decommissioned replica(s), 0 decommissioning replica(s). 0. BP-465154060-10.96.83.87-1627619151215:blk_1073741841_1017 len=27 Live_repl=2 [DatanodeInfoWithStorage[XX.XX.XX.XX:9866,DS-f0c50895-ebae-4285-8a05-c7c05c5715a1,DISK], DatanodeInfoWithStorage[XX.XX.XX.XX:9866,DS-84cd2957-a0cf-4e79-af55-93e938d84ecd,DISK]]Status: HEALTHYNumber of data-nodes: 2Number of racks: 1Total dirs: 0Total symlinks: 0Replicated Blocks:Total size: 27 BTotal files: 1Total blocks (validated): 1 (avg. block size 27 B)Minimally replicated blocks: 1 (100.0 %)Over-replicated blocks: 0 (0.0 %)Under-replicated blocks: 1 (100.0 %)Mis-replicated blocks: 0 (0.0 %)Default replication factor: 3Average block replication: 2.0Missing blocks: 0Corrupt blocks: 0Missing replicas: 1 (33.333332 %)Erasure Coded Block Groups:Total size: 0 BTotal files: 0Total block groups (validated): 0Minimally erasure-coded block groups: 0Over-erasure-coded block groups: 0Under-erasure-coded block groups: 0Unsatisfactory placement block groups: 0Average block group size: 0.0Missing block groups: 0Corrupt block groups: 0Missing internal blocks: 0 FSCK ended at Thu Aug 26 15:28:38 CST 2021 in 1 millisecondsThe filesystem under path '/test/tail.txt' is HEALTHY客戶端發(fā)出fsck請(qǐng)求
測(cè)試代碼如下
@Testpublic void FsckShell() throws Exception{Configuration conf = new Configuration();conf.addResource(new Path("/Users/didi/hdfs-site.xml"));conf.addResource(new Path("/Users/didi/core-site.xml"));FsShell fsShell = new FsShell();fsShell.setConf(conf);String[] args = {"-openforwrite", "/test"};ByteArrayOutputStream bStream = new ByteArrayOutputStream();PrintStream out = new PrintStream(bStream, true);DFSck dfsck = new DFSck(conf, out);int errCode = ToolRunner.run(dfsck, args);System.out.println(errCode);System.out.println(bStream.toString());}從ToolRunner.run()方法step into。
fsck工具的啟動(dòng)入口在org.apache.hadoop.hdfs.tools.DFSck類,主要運(yùn)行邏輯在doWork方法中
這里貼上getCurrentNamenodeAddress()方法
private URI getCurrentNamenodeAddress(Path target) throws IOException {//String nnAddress = null;Configuration conf = getConf();//get the filesystem object to verify it is an HDFS systemfinal FileSystem fs = target.getFileSystem(conf);if (!(fs instanceof DistributedFileSystem)) {System.err.println("FileSystem is " + fs.getUri());return null;}return DFSUtil.getInfoServer(HAUtil.getAddressOfActive(fs), conf,DFSUtil.getHttpClientScheme(conf));//這里,getAddressOfActive()方法進(jìn)行了RPC,再次與namenode交互獲得了當(dāng)前處于active狀態(tài)的namenode上的fs路徑的地址}DFSUtil.getInfoServer()方法如下,主要拼接了一個(gè)URI:
public static URI getInfoServer(InetSocketAddress namenodeAddr,Configuration conf, String scheme) throws IOException {String[] suffixes = null;if (namenodeAddr != null) {// if non-default namenode, try reverse look up // the nameServiceID if it is availablesuffixes = getSuffixIDs(conf, namenodeAddr, // 這個(gè)方法返回了存儲(chǔ)目標(biāo)路徑的socketaddress:master1/XX.XX.XX.XX:8020DFSConfigKeys.DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY,DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY);}String authority;if ("http".equals(scheme)) {authority = getSuffixedConf(conf, DFS_NAMENODE_HTTP_ADDRESS_KEY,DFS_NAMENODE_HTTP_ADDRESS_DEFAULT, suffixes);} else if ("https".equals(scheme)) {//...//...return URI.create(scheme + "://" + authority);}我們接著step into
static String[] getSuffixIDs(final Configuration conf,final InetSocketAddress address, final String... keys) {AddressMatcher matcher = new AddressMatcher() {@Overridepublic boolean match(InetSocketAddress s) {return address.equals(s);} };for (String key : keys) {String[] ids = getSuffixIDs(conf, key, null, null, matcher);if (ids != null && (ids [0] != null || ids[1] != null)) {return ids;}}return null;}在這個(gè)方法中,通過本地conf中的dfs.namenode.rpc-address,來(lái)匹配RPC得到的地址(置放于matcher),如匹配成功,則返回所在NS和NN
static String[] getSuffixIDs(final Configuration conf, final String addressKey,String knownNsId, String knownNNId,final AddressMatcher matcher) {String nameserviceId = null;String namenodeId = null;int found = 0;Collection<String> nsIds = getNameServiceIds(conf);for (String nsId : emptyAsSingletonNull(nsIds)) {if (knownNsId != null && !knownNsId.equals(nsId)) {continue;}Collection<String> nnIds = getNameNodeIds(conf, nsId);for (String nnId : emptyAsSingletonNull(nnIds)) {if (LOG.isTraceEnabled()) {LOG.trace(String.format("addressKey: %s nsId: %s nnId: %s",addressKey, nsId, nnId));}if (knownNNId != null && !knownNNId.equals(nnId)) {continue;}String key = addKeySuffixes(addressKey, nsId, nnId);String addr = conf.get(key);if (addr == null) {continue;}InetSocketAddress s = null;try {s = NetUtils.createSocketAddr(addr);} catch (Exception e) {LOG.warn("Exception in creating socket address " + addr, e);continue;}if (!s.isUnresolved() && matcher.match(s)) {nameserviceId = nsId;namenodeId = nnId;found++;}}}if (found > 1) { // Only one address must match the local addressString msg = "Configuration has multiple addresses that match "+ "local node's address. Please configure the system with "+ DFS_NAMESERVICE_ID + " and "+ DFS_HA_NAMENODE_ID_KEY + ". Choose the last address.";throw new HadoopIllegalArgumentException(msg);}return new String[] { nameserviceId, namenodeId };}這里是FsckShell()方法最后的部分,
if (namenodeAddress == null) {//Error message already output in {@link #getCurrentNamenodeAddress()}System.err.println("DFSck exiting.");return 0;}url.insert(0, namenodeAddress.toString());//這里,將url和namenode的地址拼在了一塊//此時(shí),url值為http://master1:50070/fsck?ugi=XXX&openforwrite=1url.append("&path=").append(URLEncoder.encode(Path.getPathWithoutSchemeAndAuthority(dirpath).toString(), "UTF-8"));//這次,拼接dir,url值為http://master1:50070/fsck?ugi=XXX&openforwrite=1&path=%2Ftest//其實(shí),namenode實(shí)現(xiàn)了fsck的servlet,這個(gè)fsck命令行腳本只不過去向這個(gè)接口提交請(qǐng)求。System.err.println("Connecting to namenode via " + url.toString());if (doListCorruptFileBlocks) {return listCorruptFileBlocks(dir, url.toString());}//如有壞塊,執(zhí)行壞塊匯報(bào)URL path = new URL(url.toString());URLConnection connection;try {connection = connectionFactory.openConnection(path, isSpnegoEnabled);} catch (AuthenticationException e) {throw new IOException(e);}InputStream stream = connection.getInputStream();BufferedReader input = new BufferedReader(new InputStreamReader(stream, "UTF-8"));String line = null;String lastLine = null;int errCode = -1;try {while ((line = input.readLine()) != null) {out.println(line);lastLine = line;}} finally {input.close();}if (lastLine.endsWith(NamenodeFsck.HEALTHY_STATUS)) {errCode = 0;} else if (lastLine.endsWith(NamenodeFsck.CORRUPT_STATUS)) {errCode = 1;} else if (lastLine.endsWith(NamenodeFsck.NONEXISTENT_STATUS)) {errCode = 0;} else if (lastLine.contains("Incorrect blockId format:")) {errCode = 0;} else if (lastLine.endsWith(NamenodeFsck.DECOMMISSIONED_STATUS)) {errCode = 2;} else if (lastLine.endsWith(NamenodeFsck.DECOMMISSIONING_STATUS)) {errCode = 3;}return errCode;}NameNode響應(yīng)fsck命令
在idea里使用全局搜索,在NameNodeHttpServer類的setupServlets方法中,找到httpServer.addInternalServlet(“fsck”, “/fsck”, FsckServlet.class, true),證明namenode確實(shí)使用Servlet響應(yīng)請(qǐng)求
在FsckServlet類的doGet方法中,找到new NamenodeFsck(conf, nn, bm.getDatanodeManager().getNetworkTopology(), pmap, out,
totalDatanodes, remoteAddress).fsck()。
我們看看這個(gè)NamenodeFsck類的構(gòu)造方法:
NamenodeFsck(Configuration conf, NameNode namenode,NetworkTopology networktopology, Map<String,String[]> pmap, PrintWriter out,int totalDatanodes, InetAddress remoteAddress) {this.conf = conf;this.namenode = namenode;this.networktopology = networktopology;this.out = out;this.totalDatanodes = totalDatanodes;this.remoteAddress = remoteAddress;this.bpPolicy = BlockPlacementPolicy.getInstance(conf, null,networktopology,namenode.getNamesystem().getBlockManager().getDatanodeManager().getHost2DatanodeMap());// for (Iterator<String> it = pmap.keySet().iterator(); it.hasNext();) {String key = it.next();if (key.equals("path")) { this.path = pmap.get("path")[0]; }else if (key.equals("move")) { this.doMove = true; }else if (key.equals("delete")) { this.doDelete = true; }else if (key.equals("files")) { this.showFiles = true; }else if (key.equals("blocks")) { this.showBlocks = true; }else if (key.equals("locations")) { this.showLocations = true; }else if (key.equals("racks")) { this.showRacks = true; }else if (key.equals("storagepolicies")) { this.showStoragePolcies = true; }else if (key.equals("openforwrite")) {this.showOpenFiles = true; }else if (key.equals("listcorruptfileblocks")) {this.showCorruptFileBlocks = true;} else if (key.equals("startblockafter")) {this.currentCookie[0] = pmap.get("startblockafter")[0];} else if (key.equals("includeSnapshots")) {this.snapshottableDirs = new ArrayList<String>();} else if (key.equals("blockId")) {this.blockIds = pmap.get("blockId")[0];}} }可以看到,pmap保存了命令的參數(shù),如匹配成功,則把對(duì)應(yīng)的標(biāo)志位置為true。而fsck方法真正響應(yīng)了請(qǐng)求
public void fsck() {final long startTime = Time.monotonicNow();try {//如果blockIDs不為空,則說(shuō)明這條命令希望檢查這些塊的信息if(blockIds != null) {String[] blocks = blockIds.split(" ");StringBuilder sb = new StringBuilder();sb.append("FSCK started by " +UserGroupInformation.getCurrentUser() + " from " +remoteAddress + " at " + new Date());out.println(sb.toString());sb.append(" for blockIds: \n");for (String blk: blocks) {if(blk == null || !blk.contains(Block.BLOCK_FILE_PREFIX)) {out.println("Incorrect blockId format: " + blk);continue;}out.print("\n");blockIdCK(blk);sb.append(blk + "\n");}LOG.info(sb.toString());namenode.getNamesystem().logFsckEvent("/", remoteAddress);out.flush();return;}String msg = "FSCK started by " + UserGroupInformation.getCurrentUser()+ " from " + remoteAddress + " for path " + path + " at " + new Date();LOG.info(msg);//用戶的fsck操作會(huì)被打到namenode的log里。out.println(msg);namenode.getNamesystem().logFsckEvent(path, remoteAddress);if (snapshottableDirs != null) {SnapshottableDirectoryStatus[] snapshotDirs = namenode.getRpcServer().getSnapshottableDirListing();if (snapshotDirs != null) {for (SnapshottableDirectoryStatus dir : snapshotDirs) {snapshottableDirs.add(dir.getFullPath().toString());}}}//這里就是找到文件對(duì)應(yīng)的inodefinal HdfsFileStatus file = namenode.getRpcServer().getFileInfo(path);if (file != null) {//根據(jù)是否匯報(bào)錯(cuò)誤塊決定流程if (showCorruptFileBlocks) {listCorruptFileBlocks();return;}//根據(jù)標(biāo)志位決定是否展示存儲(chǔ)方法if (this.showStoragePolcies) {storageTypeSummary = new StoragePolicySummary(namenode.getNamesystem().getBlockManager().getStoragePolicies());}Result res = new Result(conf);check(path, file, res);//進(jìn)一步根據(jù)構(gòu)造方法中賦予的標(biāo)志位決定打印信息out.println(res);out.println(" Number of data-nodes:\t\t" + totalDatanodes);out.println(" Number of racks:\t\t" + networktopology.getNumOfRacks());if (this.showStoragePolcies) {out.print(storageTypeSummary.toString());}out.println("FSCK ended at " + new Date() + " in "+ (Time.monotonicNow() - startTime + " milliseconds"));// If there were internal errors during the fsck operation, we want to// return FAILURE_STATUS, even if those errors were not immediately// fatal. Otherwise many unit tests will pass even when there are bugs.if (internalError) {throw new IOException("fsck encountered internal errors!");}// DFSck client scans for the string HEALTHY/CORRUPT to check the status// of file system and return appropriate code. Changing the output// string might break testcases. Also note this must be the last line // of the report.if (res.isHealthy()) {out.print("\n\nThe filesystem under path '" + path + "' " + HEALTHY_STATUS);} else {out.print("\n\nThe filesystem under path '" + path + "' " + CORRUPT_STATUS);}} else {out.print("\n\nPath '" + path + "' " + NONEXISTENT_STATUS);}} catch (Exception e) {String errMsg = "Fsck on path '" + path + "' " + FAILURE_STATUS;LOG.warn(errMsg, e);out.println("FSCK ended at " + new Date() + " in "+ (Time.monotonicNow() - startTime + " milliseconds"));out.println(e.getMessage());out.print("\n\n" + errMsg);} finally {out.close();}}我們最后看看check方法如何決定打印信息的,基本都寫在了if里
void check(String parent, HdfsFileStatus file, Result res) throws IOException {String path = file.getFullName(parent);boolean isOpen = false;if (file.isDir()) {if (snapshottableDirs != null && snapshottableDirs.contains(path)) {String snapshotPath = (path.endsWith(Path.SEPARATOR) ? path : path+ Path.SEPARATOR)+ HdfsConstants.DOT_SNAPSHOT_DIR;HdfsFileStatus snapshotFileInfo = namenode.getRpcServer().getFileInfo(snapshotPath);check(snapshotPath, snapshotFileInfo, res);}byte[] lastReturnedName = HdfsFileStatus.EMPTY_NAME;DirectoryListing thisListing;if (showFiles) {out.println(path + " <dir>");}res.totalDirs++;do {assert lastReturnedName != null;thisListing = namenode.getRpcServer().getListing(path, lastReturnedName, false);if (thisListing == null) {return;}HdfsFileStatus[] files = thisListing.getPartialListing();for (int i = 0; i < files.length; i++) {check(path, files[i], res);}lastReturnedName = thisListing.getLastName();} while (thisListing.hasMore());return;}if (file.isSymlink()) {if (showFiles) {out.println(path + " <symlink>");}res.totalSymlinks++;return;}long fileLen = file.getLen();// Get block locations without updating the file access time // and without block access tokensLocatedBlocks blocks = null;FSNamesystem fsn = namenode.getNamesystem();fsn.readLock();try {blocks = fsn.getBlockLocations(fsn.getPermissionChecker(), path, 0, fileLen, false, false).blocks;} catch (FileNotFoundException fnfe) {blocks = null;} finally {fsn.readUnlock();}if (blocks == null) { // the file is deletedreturn;}isOpen = blocks.isUnderConstruction();if (isOpen && !showOpenFiles) {// We collect these stats about open files to report with default optionsres.totalOpenFilesSize += fileLen;res.totalOpenFilesBlocks += blocks.locatedBlockCount();res.totalOpenFiles++;return;}res.totalFiles++;res.totalSize += fileLen;res.totalBlocks += blocks.locatedBlockCount();if (showOpenFiles && isOpen) {out.print(path + " " + fileLen + " bytes, " +blocks.locatedBlockCount() + " block(s), OPENFORWRITE: ");} else if (showFiles) {out.print(path + " " + fileLen + " bytes, " +blocks.locatedBlockCount() + " block(s): ");} else {out.print('.');}if (res.totalFiles % 100 == 0) { out.println(); out.flush(); }int missing = 0;int corrupt = 0;long missize = 0;int underReplicatedPerFile = 0;int misReplicatedPerFile = 0;StringBuilder report = new StringBuilder();int i = 0;for (LocatedBlock lBlk : blocks.getLocatedBlocks()) {ExtendedBlock block = lBlk.getBlock();boolean isCorrupt = lBlk.isCorrupt();String blkName = block.toString();DatanodeInfo[] locs = lBlk.getLocations();NumberReplicas numberReplicas =namenode.getNamesystem().getBlockManager().countNodes(block.getLocalBlock());int liveReplicas = numberReplicas.liveReplicas();res.totalReplicas += liveReplicas;short targetFileReplication = file.getReplication();res.numExpectedReplicas += targetFileReplication;if(liveReplicas < res.minReplication){res.numUnderMinReplicatedBlocks++;}if (liveReplicas > targetFileReplication) {res.excessiveReplicas += (liveReplicas - targetFileReplication);res.numOverReplicatedBlocks += 1;}//keep track of storage tier countsif (this.showStoragePolcies && lBlk.getStorageTypes() != null) {StorageType[] storageTypes = lBlk.getStorageTypes();storageTypeSummary.add(Arrays.copyOf(storageTypes, storageTypes.length),fsn.getBlockManager().getStoragePolicy(file.getStoragePolicy()));}// Check if block is Corruptif (isCorrupt) {corrupt++;res.corruptBlocks++;out.print("\n" + path + ": CORRUPT blockpool " + block.getBlockPoolId() + " block " + block.getBlockName()+"\n");}if (liveReplicas >= res.minReplication)res.numMinReplicatedBlocks++;if (liveReplicas < targetFileReplication && liveReplicas > 0) {res.missingReplicas += (targetFileReplication - liveReplicas);res.numUnderReplicatedBlocks += 1;underReplicatedPerFile++;if (!showFiles) {out.print("\n" + path + ": ");}out.println(" Under replicated " + block +". Target Replicas is " +targetFileReplication + " but found " +liveReplicas + " replica(s).");}// count mis replicated blocksBlockPlacementStatus blockPlacementStatus = bpPolicy.verifyBlockPlacement(lBlk.getLocations(), targetFileReplication);if (!blockPlacementStatus.isPlacementPolicySatisfied()) {res.numMisReplicatedBlocks++;misReplicatedPerFile++;if (!showFiles) {if(underReplicatedPerFile == 0)out.println();out.print(path + ": ");}out.println(" Replica placement policy is violated for " + block + ". " + blockPlacementStatus.getErrorDescription());}report.append(i + ". " + blkName + " len=" + block.getNumBytes());if (liveReplicas == 0) {report.append(" MISSING!");res.addMissing(block.toString(), block.getNumBytes());missing++;missize += block.getNumBytes();} else {report.append(" repl=" + liveReplicas);if (showLocations || showRacks) {StringBuilder sb = new StringBuilder("[");for (int j = 0; j < locs.length; j++) {if (j > 0) { sb.append(", "); }if (showRacks)sb.append(NodeBase.getPath(locs[j]));elsesb.append(locs[j]);}sb.append(']');report.append(" " + sb.toString());}}report.append('\n');i++;}if ((missing > 0) || (corrupt > 0)) {if (!showFiles && (missing > 0)) {out.print("\n" + path + ": MISSING " + missing+ " blocks of total size " + missize + " B.");}res.corruptFiles++;if (isOpen) {LOG.info("Fsck: ignoring open file " + path);} else {if (doMove) copyBlocksToLostFound(parent, file, blocks);if (doDelete) deleteCorruptedFile(path);}}if (showFiles) {if (missing > 0) {out.print(" MISSING " + missing + " blocks of total size " + missize + " B\n");} else if (underReplicatedPerFile == 0 && misReplicatedPerFile == 0) {out.print(" OK\n");}if (showBlocks) {out.print(report.toString() + "\n");}}}小結(jié):
fsck是namenode本身提供的對(duì)外接口,通過servlet方式訪問調(diào)用,至于訪問方式,隨便,只要提交這個(gè)接口請(qǐng)求就行了,hadoop的shell命令行是通過一個(gè)工具類使用java提交的,你也可指直接在瀏覽器拼接url,例如:
http://10.4.19.42:50070/fsck?ugi=hadoop&path=/tmp/hadoop/wordcountjavain&files=1&blocks=1&locations=1&racks=1
等價(jià)于hadoop fsck /tmp/hadoop/wordcountjavain -files -blocks -locations -racks
fsck的實(shí)質(zhì)是通過name的大管家FsNamesystem對(duì)象(FSDirectory)管理的那套命名空間,及其塊匯報(bào)上來(lái)的信息,從namenode的內(nèi)存中讀取inode的屬性及其block信息等,副本數(shù),多少個(gè)塊,有木有順壞,這些結(jié)果都是現(xiàn)成的,并不需要namenode再去dn找找到對(duì)應(yīng)的塊,然后讓datanode去檢查,所以這種“現(xiàn)成”的結(jié)果,即存在namenode內(nèi)存的信息,就是你fsck得到的結(jié)果有時(shí)候是過時(shí)的,你更改文件一段時(shí)間后,才能從fsck到準(zhǔn)確的記過,比如我把dn上得文件blk給換一個(gè)壞的,這時(shí)候namenode沒有拿到塊匯報(bào)信息,你不會(huì)從fsck結(jié)果立即感知到它壞了。
但是刪除等會(huì)從fsck得到信息,因?yàn)閯h除的原理前面也介紹過了無(wú)論是trash還是skiptrash,都只是把要?jiǎng)h除的文件進(jìn)行標(biāo)記(寄一本臺(tái)賬,有清理線程周期發(fā)布rpc調(diào)用對(duì)應(yīng)的dn去刪除blk),它直接影響namenode的內(nèi)存和命名空間(還包括塊匯報(bào)信息,即dn弄過來(lái)的塊信息也會(huì)因?yàn)閯h除操作而被修改)。
部分參考:https://blog.csdn.net/tracymkgld/article/details/18044577
總結(jié)
以上是生活随笔為你收集整理的hadoop fsck命令分析 + 源码解析的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 一文搞懂SDIO
- 下一篇: Mondrian:建模多值维度属性