生活随笔
收集整理的這篇文章主要介紹了
HDP聚合日志解析内容-ifile和tfile
小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.
解析hdfs上的聚合日志, 共4個類, 打包后上傳到服務(wù)器, 將hdfs上的日志文件下載到本地, 使用命令java -jar 包名 日志路徑名
效果圖:
代碼:
package YarnLogFileReader;import org.apache.commons.lang3.SerializationUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.compress.Decompressor;
import org.apache.hadoop.io.file.tfile.BoundedRangeFileInputStream;
import org.apache.hadoop.io.file.tfile.Compression;
import org.apache.hadoop.yarn.logaggregation.filecontroller.ifile.LogAggregationIndexedFileController;import java.io.*;
import java.util.*;public class IndexedFormatLogReader implements LogReader {public void printContainerLogForFile(Path path
, Configuration conf
) throws Exception{Compression.Algorithm compressName
= Compression.getCompressionAlgorithmByName("gz");Decompressor decompressor
= compressName
.getDecompressor();FileContext fileContext
= FileContext.getFileContext(path
.toUri(), conf
);FSDataInputStream fsDataInputStream
= fileContext
.open(path
);FSDataInputStream fsDataInputStream1
= fileContext
.open(path
);long fileLength
= fileContext
.getFileStatus(path
).getLen();fsDataInputStream
.seek(fileLength
- 4L - 32L);int offset
= fsDataInputStream
.readInt();byte[] array
= new byte[offset
];fsDataInputStream
.seek(fileLength
- (long) offset
- 4L - 32L);int actual
= fsDataInputStream
.read(array
);LogAggregationIndexedFileController.IndexedLogsMeta logMeta
= (LogAggregationIndexedFileController.IndexedLogsMeta) SerializationUtils.deserialize(array
);Iterator iter
= logMeta
.getLogMetas().iterator();while(iter
.hasNext()) {LogAggregationIndexedFileController.IndexedPerAggregationLogMeta perAggregationLogMeta
= (LogAggregationIndexedFileController.IndexedPerAggregationLogMeta) iter
.next();Iterator iter1
= new TreeMap(perAggregationLogMeta
.getLogMetas()).entrySet().iterator();while(iter1
.hasNext()) {Map.Entry<String, List<LogAggregationIndexedFileController.IndexedFileLogMeta>> log
= (Map.Entry) iter1
.next();Iterator iter2
= log
.getValue().iterator();InputStream in
= null;while(iter2
.hasNext()) {LogAggregationIndexedFileController.IndexedFileLogMeta indexedFileLogMeta
= (LogAggregationIndexedFileController.IndexedFileLogMeta) iter2
.next();in
= compressName
.createDecompressionStream(new BoundedRangeFileInputStream(fsDataInputStream1
, indexedFileLogMeta
.getStartIndex(), indexedFileLogMeta
.getFileCompressedSize()), decompressor
, 262144);StringBuilder sb
= new StringBuilder();String containerStr
= String.format("Container: %s on %s", indexedFileLogMeta
.getContainerId(), path
.getName());sb
.append(containerStr
+ "\n");sb
.append("LogType: " + indexedFileLogMeta
.getFileName() + "\n");sb
.append("LogLastModifiedTime: " + new Date(indexedFileLogMeta
.getLastModifiedTime()) + "\n");sb
.append("LogLength: " + indexedFileLogMeta
.getFileSize() + "\n");sb
.append("LogContents:\n");BufferedReader br
= new BufferedReader(new InputStreamReader(in
));System.out
.println(sb
.toString());String line
= null;while((line
= br
.readLine()) != null) {System.out
.println(line
);}System.out
.printf("End of LogType: %s\n", indexedFileLogMeta
.getFileName());System.out
.printf("*****************************************************************************\n\n");}}}fsDataInputStream
.close();fsDataInputStream1
.close();}
}
package YarnLogFileReader;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.Path;import java.io.DataInputStream;public interface LogReader {public void printContainerLogForFile(Path path
, Configuration conf
) throws Exception;}
package YarnLogFileReader;import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat;
import org.apache.hadoop.yarn.logaggregation.ContainerLogAggregationType;
import org.apache.hadoop.yarn.logaggregation.LogToolUtils;
import org.apache.hadoop.yarn.util.Times;import java.io.DataInputStream;
import java.io.EOFException;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.charset.Charset;public class TFileLogReader implements LogReader {@Overridepublic void printContainerLogForFile(Path path
, Configuration conf
) throws Exception {try {AggregatedLogFormat.LogReader reader
= new AggregatedLogFormat.LogReader(conf
, path
);AggregatedLogFormat.LogKey key
= new AggregatedLogFormat.LogKey();FileContext context
= FileContext.getFileContext(path
.toUri(), conf
);FileStatus status
= context
.getFileStatus(path
);long size
= context
.getFileStatus(path
).getLen();byte[] buf
= new byte['\uffff'];DataInputStream valueStream
= reader
.next(key
);while (true) {try {String fileType
= valueStream
.readUTF();String fileLengthStr
= valueStream
.readUTF();long fileLength
= Long.parseLong(fileLengthStr
);LogToolUtils.outputContainerLog(key
.toString(), path
.getName(), fileType
, fileLength
, size
, Times.format(status
.getModificationTime()), valueStream
, (OutputStream) System.out
, buf
, ContainerLogAggregationType.AGGREGATED
);byte[] b
= this.aggregatedLogSuffix(fileType
).getBytes(Charset.forName("UTF-8"));((OutputStream) System.out
).write(b
, 0, b
.length
);} catch (EOFException eofException
) {break;}}}catch(IOException ioe
) {if("Not a valid BCFile.".equals(ioe
.getMessage())) {return;} elsethrow ioe
;}}private String aggregatedLogSuffix(String fileName
) {StringBuilder sb
= new StringBuilder();String endOfFile
= "End of LogType:" + fileName
;sb
.append("\n" + endOfFile
+ "\n");sb
.append(StringUtils.repeat("*", endOfFile
.length() + 50) + "\n\n");return sb
.toString();}}
package YarnLogFileReader;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.fs.azure.AzureException;
import org.apache.hadoop.io.compress.Decompressor;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat;import java.io.*;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.*;import org.apache.hadoop.io.file.tfile.Compression.Algorithm;
import org.apache.hadoop.io.file.tfile.Compression;import java.security.SecureRandom;public class YarnLogFileReader
{private static List list
= new ArrayList();private static Configuration conf
= new YarnConfiguration();private static final SecureRandom RAN
= new SecureRandom();static {conf
.set("fs.AbstractFileSystem.wasb.impl", "org.apache.hadoop.fs.azure.Wasb");conf
.set("fs.AbstractFileSystem.wasbs.impl", "org.apache.hadoop.fs.azure.Wasbs");conf
.set("fs.wasb.impl", "org.apache.hadoop.fs.azure.NativeAzureFileSystem");conf
.set("fs.wasbs.impl", "org.apache.hadoop.fs.azure.NativeAzureFileSystem");conf
.set("fs.abfs.impl", "org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem");conf
.set("fs.abfss.impl", "org.apache.hadoop.fs.azurebfs.SecureAzureBlobFileSystem");conf
.set("fs.AbstractFileSystem.abfs.impl", "org.apache.hadoop.fs.azurebfs.Abfs");conf
.set("fs.AbstractFileSystem.abfss.impl", "org.apache.hadoop.fs.azurebfs.Abfss");
}public static void main( String[] args
) throws Exception{if(args
.length
!= 1) {System.out
.println("Usage: java -classpath '/etc/hadoop/conf:./target/YarnLogFileReader-1.0-SNAPSHOT-dependencies.jar:/usr/hdp/current/hadoop-hdfs-client/lib/adls2-oauth2-token-provider.jar' YarnLogFileReader.YarnLogFileReader <path of folder contains logs>" );System.out
.println("Example: java -classpath '/etc/hadoop/conf:./target/YarnLogFileReader-1.0-SNAPSHOT-dependencies.jar:/usr/hdp/current/hadoop-hdfs-client/lib/adls2-oauth2-token-provider.jar' YarnLogFileReader.YarnLogFileReader wasb://lazhuhdi-2019-05-09t07-12-39-811z@lzlazhuhdi.blob.core.windows.net//app-logs/chenghao.guo/logs-ifile/application_1557457099458_0010");System.exit(1);}try {InetAddress headnodehost
= InetAddress.getByName("headnodehost");} catch(UnknownHostException ex
) {System.out
.println("Not running on cluster");conf
.set("fs.adl.impl", "org.apache.hadoop.fs.adl.AdlFileSystem");conf
.set("fs.adls.impl", "org.apache.hadoop.fs.adl.AdlFileSystem");conf
.set("fs.AbstractFileSystem.adl.impl", "org.apache.hadoop.fs.adl.Adl");conf
.set("fs.AbstractFileSystem.adls.impl", "org.apache.hadoop.fs.adl.Adl");YarnLogFileReader app
= new YarnLogFileReader(false, args
[0]);app
.printAllContainerLog(args
[0]);System.exit(0);}YarnLogFileReader app
= new YarnLogFileReader(true, "");app
.printAllContainerLog(args
[0]);}public YarnLogFileReader(boolean isCluster
, String path
) throws IOException {if (!isCluster
) {Console console
= System.console();BufferedReader reader
= new BufferedReader(new InputStreamReader(System.in
));int schemeIndex
;if ((schemeIndex
= path
.indexOf("://")) != -1) {String scheme
= path
.substring(0, schemeIndex
);scheme
= "rgetebhethnnr";switch (scheme
) {case "wasb":case "wasbs":case "abfs":case "abfss":if("wasb".equals(scheme
) || "wasbs".equals("scheme"))System.out
.println("Scheme is blob storage");elseSystem.out
.println("Scheme is adls gen2");String accountName
= path
.substring(path
.indexOf("@")+1, path
.indexOf("/", schemeIndex
+3));System.out
.printf("Storage key (%s):", accountName
);char[] storageKeyChars
= console
.readPassword();String storageKey
= new String(storageKeyChars
);conf
.set("fs.azure.account.key."+accountName
, storageKey
);conf
.set("fs.defaultFS", path
.substring(0, path
.indexOf("/", schemeIndex
+3)));break;case "adl":System.out
.println("Scheme is adls gen1");String adlsAccountName
= path
.substring(schemeIndex
+3, path
.indexOf("/", schemeIndex
+3));System.out
.printf("Client ID (%s): ", adlsAccountName
);String clientId
= reader
.readLine();System.out
.printf("Client Secret (%s): ", adlsAccountName
);char[] clientSecretChars
= console
.readPassword();String clientSecret
= new String(clientSecretChars
);System.out
.printf("Tenant ID (%s): ", adlsAccountName
);String tenantId
= reader
.readLine();conf
.set("dfs.adls.oauth.access.token.provider.type", "ClientCredential");conf
.set("dfs.adls.oauth2.refresh.url", "https://login.microsoftonline.com/"+tenantId
+"/oauth2/token");conf
.set("dfs.adls.oauth2.client.id", clientId
);conf
.set("dfs.adls.oauth2.credential", clientSecret
);conf
.set("fs.defaultFS", path
.substring(0, path
.indexOf("/", schemeIndex
+3)));break;default:conf
.set("fs.defaultFS", "file:///");conf
.set("fs.AbstractFileSystem.file.impl", "org.apache.hadoop.fs.local.LocalFs");System.out
.println("Try local file system");}} else {System.out
.print("Type scheme (wasb, wasbs, abfs, abfss, adl):");String scheme
= "rgetebhethnnr"; switch (scheme
) {case "wasb":case "wasbs":case "abfs":case "abfss":if("wasb".equals(scheme
) || "wasbs".equals(scheme
))System.out
.println("Scheme is blob storage");elseSystem.out
.println("Scheme is adls gen2");System.out
.print("Storage Account Name:");String accountName
= reader
.readLine();accountName
= resolveAccountName(accountName
, scheme
);System.out
.printf("Container Name (%s): ", accountName
);String containerName
= reader
.readLine();System.out
.printf("Storage key (%s): ", accountName
);char[] storageKeyChars
= console
.readPassword();String storageKey
= new String(storageKeyChars
);if("wasb".equals(scheme
) || "wasbs".equals(scheme
)) {conf
.set("fs.defaultFS", scheme
+ "://" + containerName
+ "@" + accountName
);conf
.set("fs.azure.account.key." + accountName
, storageKey
);} else {conf
.set("fs.defaultFS", scheme
+ "://" + containerName
+ "@" + accountName
);conf
.set("fs.azure.account.key." + accountName
, storageKey
);}break;case "adl":case "adls":System.out
.println("Scheme is adls gen1");System.out
.print("Data Lake Account Name:");String adlsAccountName
= reader
.readLine();adlsAccountName
= resolveAccountName(adlsAccountName
, scheme
);System.out
.printf("Client ID (%s): ", adlsAccountName
);String clientId
= reader
.readLine();System.out
.printf("Client Secret (%s): ", adlsAccountName
);char[] clientSecretChars
= console
.readPassword();String clientSecret
= new String(clientSecretChars
);System.out
.printf("Tenant ID (%s): ", adlsAccountName
);String tenantId
= reader
.readLine();conf
.set("fs.defaultFS", scheme
+ "://" + adlsAccountName
);conf
.set("dfs.adls.oauth.access.token.provider.type", "ClientCredential");conf
.set("dfs.adls.oauth2.refresh.url", "https://login.microsoftonline.com/"+tenantId
+"/oauth2/token");conf
.set("dfs.adls.oauth2.client.id", clientId
);conf
.set("dfs.adls.oauth2.credential", clientSecret
);break;default:conf
.set("fs.defaultFS", "file:///");}}}}private String resolveAccountName(String accountName
, String scheme
) {if(accountName
.indexOf(".") != -1)accountName
= accountName
.substring(0, accountName
.indexOf("."));switch(scheme
) {case "wasb":case "wasbs":accountName
+= ".blob.core.windows.net";break;case "abfs":case "abfss":accountName
+= ".dfs.core.windows.net";break;case "adl":accountName
+= ".azuredatalakestore.net";break;}return accountName
;}private void printAllContainerLog(String file
) throws Exception {List result
= getAllFiles(new Path(file
));if(result
.size() == 0) {System.out
.println("No file found");System.exit(0);}for(int i
= 0; i
< result
.size(); i
++) {printContainerLogForFile((Path) result
.get(i
));}}private void printContainerLogForFile(Path path
) throws Exception {Algorithm compressName
= Compression.getCompressionAlgorithmByName("gz");Decompressor decompressor
= compressName
.getDecompressor();try {LogReader logReader
= probeFileFormat(path
);logReader
.printContainerLogForFile(path
, conf
);}catch(Exception ex
){return;}}private LogReader probeFileFormat(Path path
) throws Exception {FileContext fileContext
= FileContext.getFileContext(path
.toUri(), conf
);FSDataInputStream fsDataInputStream
= fileContext
.open(path
);long fileLength
= fileContext
.getFileStatus(path
).getLen();try {fsDataInputStream
.seek(fileLength
- 4L - 32L);int offset
= fsDataInputStream
.readInt();if(offset
>= 10485760)throw new Exception();byte[] array
= new byte[offset
];fsDataInputStream
.seek(fileLength
- (long) offset
- 4L - 32L);fsDataInputStream
.close();return new IndexedFormatLogReader();} catch (Exception eofex
) {try {AggregatedLogFormat.LogReader reader
= new AggregatedLogFormat.LogReader(conf
, path
);return new TFileLogReader();} catch(Exception ex
) {System.out
.printf("The file %s is not an indexed formatted log file\n", path
.toString());throw ex
;}}}private List getAllFiles(Path path
) throws Exception {try {FileSystem fs
= FileSystem.newInstance(conf
);if (!fs
.getFileStatus(path
).isDirectory())list
.add(path
);else {FileStatus[] files
= fs
.listStatus(path
);for (int i
= 0; i
< files
.length
; i
++) {if (files
[i
].isDirectory())getAllFiles(files
[i
].getPath());elselist
.add(files
[i
].getPath());}}return list
;} catch (AzureException ex
) {System.out
.println("Unable to initialize the filesystem or unable to list file status, please check input parameters");throw ex
;}}}
<?xml version=
"1.0" encoding=
"UTF-8"?><project xmlns=
"http://maven.apache.org/POM/4.0.0" xmlns:xsi=
"http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation=
"http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4
.0
.0<
/modelVersion><groupId>com
.microsoft
.css
.apacbigdata<
/groupId><artifactId>YarnLogFileReader<
/artifactId><version>1
.0
-SNAPSHOT<
/version><name>YarnLogFileReader<
/name><
!-
- FIXME change it to the project's website
--><url>http:
//www
.example
.com<
/url><properties><project
.build
.sourceEncoding>UTF
-8<
/project
.build
.sourceEncoding><maven
.compiler
.source>1
.7<
/maven
.compiler
.source><maven
.compiler
.target>1
.7<
/maven
.compiler
.target><
/properties><dependencies><dependency><groupId>junit<
/groupId><artifactId>junit<
/artifactId><version>4
.11<
/version><scope>test<
/scope><
/dependency><dependency><groupId>org
.apache
.hadoop<
/groupId><artifactId>hadoop
-common<
/artifactId><version>3
.2
.1<
/version><
/dependency><dependency><groupId>org
.apache
.hadoop<
/groupId><artifactId>hadoop
-yarn
-common<
/artifactId><version>3
.2
.1<
/version><
/dependency><dependency><groupId>org
.apache
.hadoop<
/groupId><artifactId>hadoop
-azure<
/artifactId><version>3
.2
.1<
/version><
/dependency><dependency><groupId>org
.apache
.hadoop<
/groupId><artifactId>hadoop
-azure
-datalake<
/artifactId><version>3
.2
.1<
/version><
/dependency><dependency><groupId>commons
-lang<
/groupId><artifactId>commons
-lang<
/artifactId><version>2
.5<
/version><
/dependency><
/dependencies><build><plugins><plugin><groupId>org
.apache
.maven
.plugins<
/groupId><artifactId>maven
-assembly
-plugin<
/artifactId><version>2
.4
.1<
/version><configuration><
!-
- get all project dependencies
--><descriptorRefs><descriptorRef>jar
-with
-dependencies<
/descriptorRef><
/descriptorRefs><
!-
- MainClass in mainfest make a executable jar
--><archive><manifest><mainClass>YarnLogFileReader
.YarnLogFileReader<
/mainClass><
/manifest><
/archive><
/configuration><executions><execution><id>make
-assembly<
/id><
!-
- bind to the packaging phase
--><phase>package<
/phase><goals><goal>single<
/goal><
/goals><
/execution><
/executions><
/plugin><
/plugins><
/build><
!-
- <build>
-->
<
!-
- <pluginManagement><
;!&ndash
; lock down plugins versions to avoid
using Maven defaults
(may be moved to parent pom
) &ndash
;>
;-->
<
!-
- <plugins>
-->
<
!-
- <plugin>
-->
<
!-
- <artifactId>maven
-assembly
-plugin<
/artifactId>
-->
<
!-
- <version>2
.4<
/version>
-->
<
!-
- <configuration>
-->
<
!-
- <archive>
-->
<
!-
- <manifest>
-->
<
!-
- <mainClass>YarnLogFileReader
.YarnLogFileReader<
/mainClass>
-->
<
!-
- <
/manifest>
-->
<
!-
- <
/archive>
-->
<
!-
- <descriptor>
-->
<
!-
- src
/assembly
/dep
.xml-
->
<
!-
- <
/descriptor>
-->
<
!-
- <
/configuration>
-->
<
!-
- <executions>
-->
<
!-
- <execution>
-->
<
!-
- <id>make
-assembly<
/id>
-->
<
!-
- <phase>package<
/phase>
-->
<
!-
- <goals>
-->
<
!-
- <goal>single<
/goal>
-->
<
!-
- <
/goals>
-->
<
!-
- <
/execution>
-->
<
!-
- <
/executions>
-->
<
!-
- <
/plugin>
-->
<
!-
- <
/plugins>
-->
<
!-
- <
/pluginManagement>
-->
<
!-
- <
/build>
-->
<
/project>
總結(jié)
以上是生活随笔為你收集整理的HDP聚合日志解析内容-ifile和tfile的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。