hdfs源码分析第二弹
?以寫文件為例,串聯(lián)整個流程的源碼:
FSDataOutputStream out = fs.create(outFile);1. DistributedFileSystem
繼承并實現(xiàn)了FileSystem,該對象是終端用戶和hadoop分布式文件系統(tǒng)交互的接口。
原文說明:
/***************************************************************** Implementation of the abstract FileSystem for the DFS system.* This object is the way end-user code interacts with a Hadoop* DistributedFileSystem.******************************************************************/調(diào)用create方法:
@Overridepublic FSDataOutputStream create(final Path f, final FsPermission permission,final EnumSet<CreateFlag> cflags, final int bufferSize,final short replication, final long blockSize, final Progressable progress,final ChecksumOpt checksumOpt) throws IOException {statistics.incrementWriteOps(1);Path absF = fixRelativePart(f);return new FileSystemLinkResolver<FSDataOutputStream>() {@Overridepublic FSDataOutputStream doCall(final Path p)throws IOException, UnresolvedLinkException {final DFSOutputStream dfsos = dfs.create(getPathName(p), permission,cflags, replication, blockSize, progress, bufferSize,checksumOpt);return dfs.createWrappedOutputStream(dfsos, statistics);}@Overridepublic FSDataOutputStream next(final FileSystem fs, final Path p)throws IOException {return fs.create(p, permission, cflags, bufferSize,replication, blockSize, progress, checksumOpt);}}.resolve(this, absF);}?
2. DFSClient
調(diào)用Create方法:
/*** Same as {@link #create(String, FsPermission, EnumSet, boolean, short, long,* Progressable, int, ChecksumOpt)} with the addition of favoredNodes that is* a hint to where the namenode should place the file blocks.* The favored nodes hint is not persisted in HDFS. Hence it may be honored* at the creation time only. HDFS could move the blocks during balancing or* replication, to move the blocks from favored nodes. A value of null means* no favored nodes for this create*/public DFSOutputStream create(String src, FsPermission permission,EnumSet<CreateFlag> flag, boolean createParent,short replication,long blockSize,Progressable progress,int buffersize,ChecksumOpt checksumOpt,InetSocketAddress[] favoredNodes) throws IOException {checkOpen();if (permission == null) {permission = FsPermission.getFileDefault();}FsPermission masked = permission.applyUMask(dfsClientConf.uMask);if(LOG.isDebugEnabled()) {LOG.debug(src + ": masked=" + masked);}String[] favoredNodeStrs = null;if (favoredNodes != null) {favoredNodeStrs = new String[favoredNodes.length];for (int i = 0; i < favoredNodes.length; i++) {favoredNodeStrs[i] = favoredNodes[i].getHostName() + ":" + favoredNodes[i].getPort();}}final DFSOutputStream result = DFSOutputStream.newStreamForCreate(this,src, masked, flag, createParent, replication, blockSize, progress,buffersize, dfsClientConf.createChecksum(checksumOpt),favoredNodeStrs);beginFileLease(result.getFileId(), result);return result;}?
3. DFSOutputStream
DFSOutputStream根據(jù)字節(jié)流創(chuàng)建文件。客戶端應用先將數(shù)據(jù)寫入流的緩存中,然后數(shù)據(jù)分解成包的形式,每個報文包(packet)通常為64k,一個報文包由多個塊(chuck)組成,每個塊通常為512比特,且存在一個關聯(lián)的checksum(類似于文件的md5值)。
當客戶端應用向當前包報文寫入數(shù)據(jù)時,數(shù)據(jù)排隊進入數(shù)據(jù)隊列(dataQueue),DataStreamer線程從數(shù)據(jù)隊列中接收這些數(shù)據(jù),然后發(fā)送到管道的第一個數(shù)據(jù)節(jié)點(datanode),并將它從數(shù)據(jù)隊列中移動到響應隊列(ackQueue)。響應處理器(ResponseProcessor)接收數(shù)據(jù)節(jié)點的響應。 當從所有的數(shù)據(jù)節(jié)點接收到一個成功的響應包報文時,ResponseProcessor將相應的包報文從響應隊列中移除。
當發(fā)送錯誤時,所有未完成的報文從響應隊列中移除。從最初的管道線中關閉舊的壞的數(shù)據(jù)節(jié)點,然后新建一個管道線。此時DataStreamer開始從數(shù)據(jù)節(jié)點中發(fā)送數(shù)據(jù)包了。
原文如下:
/***************************************************************** DFSOutputStream creates files from a stream of bytes.** The client application writes data that is cached internally by* this stream. Data is broken up into packets, each packet is* typically 64K in size. A packet comprises of chunks. Each chunk* is typically 512 bytes and has an associated checksum with it.** When a client application fills up the currentPacket, it is* enqueued into dataQueue. The DataStreamer thread picks up* packets from the dataQueue, sends it to the first datanode in* the pipeline and moves it from the dataQueue to the ackQueue.* The ResponseProcessor receives acks from the datanodes. When an* successful ack for a packet is received from all datanodes, the* ResponseProcessor removes the corresponding packet from the* ackQueue.** In case of error, all outstanding packets and moved from* ackQueue. A new pipeline is setup by eliminating the bad* datanode from the original pipeline. The DataStreamer now* starts sending packets from the dataQueue. ****************************************************************/?
static DFSOutputStream newStreamForCreate(DFSClient dfsClient, String src,FsPermission masked, EnumSet<CreateFlag> flag, boolean createParent,short replication, long blockSize, Progressable progress, int buffersize,DataChecksum checksum, String[] favoredNodes) throws IOException {HdfsFileStatus stat = null;// Retry the create if we get a RetryStartFileException up to a maximum// number of timesboolean shouldRetry = true;int retryCount = CREATE_RETRY_COUNT;while (shouldRetry) {shouldRetry = false;try {stat = dfsClient.namenode.create(src, masked, dfsClient.clientName,new EnumSetWritable<CreateFlag>(flag), createParent, replication,blockSize, SUPPORTED_CRYPTO_VERSIONS);break;} catch (RemoteException re) {IOException e = re.unwrapRemoteException(AccessControlException.class,DSQuotaExceededException.class,FileAlreadyExistsException.class,FileNotFoundException.class,ParentNotDirectoryException.class,NSQuotaExceededException.class,RetryStartFileException.class,SafeModeException.class,UnresolvedPathException.class,SnapshotAccessControlException.class,UnknownCryptoProtocolVersionException.class);if (e instanceof RetryStartFileException) {if (retryCount > 0) {shouldRetry = true;retryCount--;} else {throw new IOException("Too many retries because of encryption" +" zone operations", e);}} else {throw e;}}}Preconditions.checkNotNull(stat, "HdfsFileStatus should not be null!");final DFSOutputStream out = new DFSOutputStream(dfsClient, src, stat,flag, progress, checksum, favoredNodes);out.start();return out;}?
Packet
?
private static class Packet {private static final long HEART_BEAT_SEQNO = -1L;long seqno; // sequencenumber of buffer in blockfinal long offsetInBlock; // offset in blockboolean syncBlock; // this packet forces the current block to diskint numChunks; // number of chunks currently in packetfinal int maxChunks; // max chunks in packetprivate byte[] buf;private boolean lastPacketInBlock; // is this the last packet in block?/*** buf is pointed into like follows:* (C is checksum data, D is payload data)** [_________CCCCCCCCC________________DDDDDDDDDDDDDDDD___]* ^ ^ ^ ^* | checksumPos dataStart dataPos* checksumStart* * Right before sending, we move the checksum data to immediately precede* the actual data, and then insert the header into the buffer immediately* preceding the checksum data, so we make sure to keep enough space in* front of the checksum data to support the largest conceivable header. */int checksumStart;int checksumPos;final int dataStart;int dataPos;/*** Create a new packet.* * @param pktSize maximum size of the packet, * including checksum data and actual data.* @param chunksPerPkt maximum number of chunks per packet.* @param offsetInBlock offset in bytes into the HDFS block.*/private Packet(byte[] buf, int chunksPerPkt, long offsetInBlock, long seqno,int checksumSize) {this.lastPacketInBlock = false;this.numChunks = 0;this.offsetInBlock = offsetInBlock;this.seqno = seqno;this.buf = buf;checksumStart = PacketHeader.PKT_MAX_HEADER_LEN;checksumPos = checksumStart;dataStart = checksumStart + (chunksPerPkt * checksumSize);dataPos = dataStart;maxChunks = chunksPerPkt;} }?
DataStreamer
DataStreamer負責發(fā)送數(shù)據(jù)報文包到管道中的數(shù)據(jù)節(jié)點。它從名稱節(jié)點獲取到新的blockid和block位置后,開始發(fā)送流報文到它的管道中。每個報文包有一個唯一的序列號。當塊中所有報文發(fā)送完成并接受到響應報文時,DataStreamer將會關閉當前的block。
private synchronized void start() {streamer.start();}原文如下:
//// The DataStreamer class is responsible for sending data packets to the// datanodes in the pipeline. It retrieves a new blockid and block locations// from the namenode, and starts streaming packets to the pipeline of// Datanodes. Every packet has a sequence number associated with// it. When all the packets for a block are sent out and acks for each// if them are received, the DataStreamer closes the current block.//繼承了Daemon(后臺線程),間接繼承了Thread類,因此其核心方法為run():
/** streamer thread is the only thread that opens streams to datanode, * and closes them. Any error recovery is also done by this thread.*/@Overridepublic void run() {long lastPacket = Time.now();TraceScope traceScope = null;if (traceSpan != null) {traceScope = Trace.continueSpan(traceSpan);}while (!streamerClosed && dfsClient.clientRunning) {// if the Responder encountered an error, shutdown Responderif (hasError && response != null) {try {response.close();response.join();response = null;} catch (InterruptedException e) {DFSClient.LOG.warn("Caught exception ", e);}}Packet one;try {// process datanode IO errors if anyboolean doSleep = false;if (hasError && (errorIndex >= 0 || restartingNodeIndex >= 0)) {doSleep = processDatanodeError();}synchronized (dataQueue) {// wait for a packet to be sent.long now = Time.now();while ((!streamerClosed && !hasError && dfsClient.clientRunning && dataQueue.size() == 0 && (stage != BlockConstructionStage.DATA_STREAMING || stage == BlockConstructionStage.DATA_STREAMING && now - lastPacket < dfsClient.getConf().socketTimeout/2)) || doSleep ) {long timeout = dfsClient.getConf().socketTimeout/2 - (now-lastPacket);timeout = timeout <= 0 ? 1000 : timeout;timeout = (stage == BlockConstructionStage.DATA_STREAMING)?timeout : 1000;try {dataQueue.wait(timeout);} catch (InterruptedException e) {DFSClient.LOG.warn("Caught exception ", e);}doSleep = false;now = Time.now();}if (streamerClosed || hasError || !dfsClient.clientRunning) {continue;}// get packet to be sent.if (dataQueue.isEmpty()) {one = createHeartbeatPacket();} else {one = dataQueue.getFirst(); // regular data packet }}assert one != null;// get new block from namenode.if (stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) {if(DFSClient.LOG.isDebugEnabled()) {DFSClient.LOG.debug("Allocating new block");}setPipeline(nextBlockOutputStream()); initDataStreaming();} else if (stage == BlockConstructionStage.PIPELINE_SETUP_APPEND) {if(DFSClient.LOG.isDebugEnabled()) {DFSClient.LOG.debug("Append to block " + block);}setupPipelineForAppendOrRecovery();initDataStreaming();}long lastByteOffsetInBlock = one.getLastByteOffsetBlock();if (lastByteOffsetInBlock > blockSize) {throw new IOException("BlockSize " + blockSize +" is smaller than data size. " +" Offset of packet in block " + lastByteOffsetInBlock +" Aborting file " + src);}if (one.lastPacketInBlock) {// wait for all data packets have been successfully ackedsynchronized (dataQueue) {while (!streamerClosed && !hasError && ackQueue.size() != 0 && dfsClient.clientRunning) {try {// wait for acks to arrive from datanodesdataQueue.wait(1000);} catch (InterruptedException e) {DFSClient.LOG.warn("Caught exception ", e);}}}if (streamerClosed || hasError || !dfsClient.clientRunning) {continue;}stage = BlockConstructionStage.PIPELINE_CLOSE;}// send the packetsynchronized (dataQueue) {// move packet from dataQueue to ackQueueif (!one.isHeartbeatPacket()) {dataQueue.removeFirst();ackQueue.addLast(one);dataQueue.notifyAll();}}if (DFSClient.LOG.isDebugEnabled()) {DFSClient.LOG.debug("DataStreamer block " + block +" sending packet " + one);}// write out data to remote datanodetry {one.writeTo(blockStream);blockStream.flush(); } catch (IOException e) {// HDFS-3398 treat primary DN is down since client is unable to // write to primary DN. If a failed or restarting node has already// been recorded by the responder, the following call will have no // effect. Pipeline recovery can handle only one node error at a// time. If the primary node fails again during the recovery, it// will be taken out then. tryMarkPrimaryDatanodeFailed();throw e;}lastPacket = Time.now();// update bytesSentlong tmpBytesSent = one.getLastByteOffsetBlock();if (bytesSent < tmpBytesSent) {bytesSent = tmpBytesSent;}if (streamerClosed || hasError || !dfsClient.clientRunning) {continue;}// Is this block full?if (one.lastPacketInBlock) {// wait for the close packet has been ackedsynchronized (dataQueue) {while (!streamerClosed && !hasError && ackQueue.size() != 0 && dfsClient.clientRunning) {dataQueue.wait(1000);// wait for acks to arrive from datanodes }}if (streamerClosed || hasError || !dfsClient.clientRunning) {continue;}endBlock();}if (progress != null) { progress.progress(); }// This is used by unit test to trigger race conditions.if (artificialSlowdown != 0 && dfsClient.clientRunning) {Thread.sleep(artificialSlowdown); }} catch (Throwable e) {// Log warning if there was a real error.if (restartingNodeIndex == -1) {DFSClient.LOG.warn("DataStreamer Exception", e);}if (e instanceof IOException) {setLastException((IOException)e);} else {setLastException(new IOException("DataStreamer Exception: ",e));}hasError = true;if (errorIndex == -1 && restartingNodeIndex == -1) {// Not a datanode issuestreamerClosed = true;}}}if (traceScope != null) {traceScope.close();}closeInternal();}?
ResponseProcessor
處理數(shù)據(jù)節(jié)點的響應。當接收到響應時,將一個包報文從響應隊列中刪除。
DataStreamer的run方法啟動了ResponseProcessor線程:
/*** Initialize for data streaming*/private void initDataStreaming() {this.setName("DataStreamer for file " + src +" block " + block);response = new ResponseProcessor(nodes);response.start();stage = BlockConstructionStage.DATA_STREAMING;}?
原文描述:
//// Processes responses from the datanodes. A packet is removed// from the ackQueue when its response arrives.//繼承了Daemon(后臺線程),間接繼承了Thread類,因此其核心方法為run():
public void run() {setName("ResponseProcessor for block " + block); PipelineAck ack = new PipelineAck(); while (!responderClosed && dfsClient.clientRunning && !isLastPacketInBlock) {// process responses from datanodes.try {// read an ack from the pipelinelong begin = Time.monotonicNow(); ack.readFields(blockReplyStream); long duration = Time.monotonicNow() - begin; if (duration > dfsclientSlowLogThresholdMs&& ack.getSeqno() != Packet.HEART_BEAT_SEQNO) {DFSClient.LOG.warn("Slow ReadProcessor read fields took " + duration+ "ms (threshold=" + dfsclientSlowLogThresholdMs + "ms); ack: "+ ack + ", targets: " + Arrays.asList(targets)); } else if (DFSClient.LOG.isDebugEnabled()) {DFSClient.LOG.debug("DFSClient " + ack); }long seqno = ack.getSeqno(); // processes response status from datanodes.for (int i = ack.getNumOfReplies()-1; i >=0 && dfsClient.clientRunning; i--) {final Status reply = ack.getReply(i); // Restart will not be treated differently unless it is// the local node or the only one in the pipeline.if (PipelineAck.isRestartOOBStatus(reply) &&shouldWaitForRestart(i)) {restartDeadline = dfsClient.getConf().datanodeRestartTimeout +Time.now(); setRestartingNodeIndex(i); String message = "A datanode is restarting: " + targets[i]; DFSClient.LOG.info(message); throw new IOException(message); }// node errorif (reply != SUCCESS) {setErrorIndex(i); // first bad datanodethrow new IOException("Bad response " + reply +" for block " + block +" from datanode " + targets[i]); }}assert seqno != PipelineAck.UNKOWN_SEQNO : "Ack for unknown seqno should be a failed ack: " + ack; if (seqno == Packet.HEART_BEAT_SEQNO) { // a heartbeat ackcontinue; }// a success ack for a data packetPacket one; synchronized (dataQueue) {one = ackQueue.getFirst(); }if (one.seqno != seqno) {throw new IOException("ResponseProcessor: Expecting seqno " +" for block " + block +one.seqno + " but received " + seqno); }isLastPacketInBlock = one.lastPacketInBlock; // Fail the packet write for testing in order to force a// pipeline recovery.if (DFSClientFaultInjector.get().failPacket() &&isLastPacketInBlock) {failPacket = true; throw new IOException("Failing the last packet for testing."); }// update bytesAckedblock.setNumBytes(one.getLastByteOffsetBlock()); synchronized (dataQueue) {lastAckedSeqno = seqno; ackQueue.removeFirst(); dataQueue.notifyAll(); one.releaseBuffer(byteArrayManager); }} catch (Exception e) {if (!responderClosed) {if (e instanceof IOException) {setLastException((IOException)e); }hasError = true; // If no explicit error report was received, mark the primary// node as failed.tryMarkPrimaryDatanodeFailed(); synchronized (dataQueue) {dataQueue.notifyAll(); }if (restartingNodeIndex == -1) {DFSClient.LOG.warn("DFSOutputStream ResponseProcessor exception "+ " for block " + block, e); }responderClosed = true; }}}}?小結(jié):
從上面的源碼分析我們可以知道:
DFSOutputStream是hdfs寫文件的主類,它通過DataStreamer來寫文件,并通過ResponseProcessor來處理數(shù)據(jù)節(jié)點的返回信息。
轉(zhuǎn)載于:https://www.cnblogs.com/davidwang456/p/4778810.html
總結(jié)
以上是生活随笔為你收集整理的hdfs源码分析第二弹的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: solr/lucence和关系数据库的混
- 下一篇: FreeMarker template