hdfs深入:10、hdfs的javaAPI操作
生活随笔
收集整理的這篇文章主要介紹了
hdfs深入:10、hdfs的javaAPI操作
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
/**
* 遞歸遍歷hdfs中所有的文件路徑
*/
@Test
public void getAllHdfsFilePath() throws URISyntaxException, IOException {
//獲取fs的客戶端
FileSystem fileSystem = FileSystem.get(new URI("hdfs://node01:8020"), new Configuration());
Path path = new Path("/");
FileStatus[] fileStatuses = fileSystem.listStatus(path);
//循環遍歷fileStatuses,如果是文件,打印文件的路徑,如果是文件夾,繼續遞歸進去
for (FileStatus fileStatus : fileStatuses){
if (fileStatus.isDirectory()){//文件夾
getDirectoryFiles(fileSystem,fileStatus);
}else{ //文件
System.out.println(fileStatus.getPath().toString());
}
}
//方法二:
System.out.println("方法二:利用官方提供API");
RemoteIterator<LocatedFileStatus> locatedFileStatusRemoteIterator = fileSystem.listFiles(new Path("/"), true);
while (locatedFileStatusRemoteIterator.hasNext()){
LocatedFileStatus next = locatedFileStatusRemoteIterator.next();
System.out.println(next.getPath());
}
//關閉fs的客戶端
fileSystem.close();
}
/**
* 遞歸獲取文件路徑
*/
public void getDirectoryFiles(FileSystem fileSystem,FileStatus fileStatus) throws IOException {
//通過fileStatus獲取文件夾路徑
Path path = fileStatus.getPath(); //該fileStatus必定為一個文件夾
FileStatus[] fileStatuses = fileSystem.listStatus(path);
for (FileStatus status:fileStatuses){
if (fileStatus.isDirectory()){
getDirectoryFiles(fileSystem,status);
}else{
System.out.println(fileStatus.getPath().toString());
}
}
}
/**
* 下載hdfs文件到本地
*/
@Test
public void copyHdfsToLocal() throws Exception {
FileSystem fileSystem = FileSystem.get(new URI("hdfs://node01:8020"), new Configuration());
FSDataInputStream inputStream = fileSystem.open(new Path("hdfs://node01:8020/aa/haha2.txt"));
FileOutputStream outputStream = new FileOutputStream(new File("d:\\install-log.txt"));
IOUtils.copy(inputStream,outputStream);
IOUtils.closeQuietly(inputStream);
IOUtils.closeQuietly(outputStream);
//方法二:利用官方API
//有報錯:java.lang.UnsatisfiedLinkError: org.apache.hadoop.io.nativeio.NativeIO$Windows.createFileWithMode0(Ljava/lang/String;JJJI)Ljava/io/FileDescriptor;
fileSystem.copyToLocalFile(new Path("hdfs://node01:8020/aa/haha2.txt"),new Path("file:///d:\\install-log2.txt"));
fileSystem.close();
}
/**
* hdfs上面創建文件夾
*/
@Test
public void createHdfsDir() throws Exception{
FileSystem fileSystem = FileSystem.get(new URI("hdfs://node01:8020"), new Configuration());
fileSystem.mkdirs(new Path("/aa/bb/cc/"));
fileSystem.close();
}
/**
* hdfs的文件上傳
*/
@Test
public void uploadFileToHdfs() throws Exception{
FileSystem fileSystem = FileSystem.get(new URI("hdfs://node01:8020"), new Configuration());
//注:new Path()中的字符串參數如果省略file:///或hdfs://的話,默認會在參數前添加hdfs://node01:8020,即,默認是hdfs路徑
fileSystem.copyFromLocalFile(false,new Path("file:///d:\\output.txt"),new Path("/aa/bb/cc"));
//第二種方法:通過流的方式
//輸出流,負責將數據輸出到hdfs的路徑上面去
FSDataOutputStream outputStream = fileSystem.create(new Path("/aa/bb/cc/empSel.hdfs"));
//通過輸入流讀取本地文件系統的文件
InputStream inputStream = new FileInputStream(new File("d:\\empSel.txt"));
IOUtils.copy(inputStream,outputStream);
IOUtils.closeQuietly(inputStream);
IOUtils.closeQuietly(outputStream);
fileSystem.close();
}
/**
* hdfs的權限校驗機制
*/
@Test
public void hdfsPermission() throws Exception{
/*
在所有節點的hdfs-site.xml中設置開啟權限驗證:
<property>
<name>dfs.permissions</name>
<value>true</value>
</property>
普通的filesystem,執行時會報錯:org.apache.hadoop.security.AccessControlException:
Permission denied: user=Administrator, access=READ, inode="/config/core-site.xml":root:supergroup:-rw-------
FileSystem fileSystem = FileSystem.get(new URI("hdfs://node01:8020"), new Configuration());
*/
//通過偽造用戶來獲取分布式文件系統的客戶端
FileSystem fileSystem = FileSystem.get(new URI("hdfs://node01:8020"), new Configuration(), "root");
//從hdfs上下載文件到本地
FSDataInputStream inputStream = fileSystem.open(new Path("/config/core-site.xml"));
FileOutputStream outputStream = new FileOutputStream(new File("d:\\core-site.txt"));
IOUtils.copy(inputStream,outputStream);
IOUtils.closeQuietly(inputStream);
IOUtils.closeQuietly(outputStream);
// fileSystem.copyFromLocalFile(new Path("file:///d:\\transferIndex.txt"),new Path("/aa/bb/cc/"));
// fileSystem.delete(new Path("/aa/bb/cc/"),false);
fileSystem.close();
}
/**
* hdfs在上傳小文件的時候進行合并
* 在我們的hdfs 的shell命令模式下,可以通過命令行將很多的hdfs文件合并成一個大文件下載到本地:
* hdfs dfs -getmerge /config/*.xml ./hello.xml
* 上傳時也能將小文件合并到一個大文件里面去。
*/
@Test
public void mergeFile()throws Exception{
//獲取分布式文件系統
FileSystem fileSystem = FileSystem.get(new URI("hdfs://192.168.8.100:8020"), new Configuration(),"root");
FSDataOutputStream outputStream = fileSystem.create(new Path("/bigFile.xml"));
//獲取本地所有小文件的輸入流
//首先獲取本地文件系統
LocalFileSystem localFileSystem = FileSystem.getLocal(new Configuration());
FileStatus[] fileStatuses = localFileSystem.listStatus(new Path("file:///D:\\上傳小文件合并"));
for (FileStatus fileStatus:fileStatuses){
Path path = fileStatus.getPath();
FSDataInputStream fsDataInputStream = localFileSystem.open(path);
IOUtils.copy(fsDataInputStream,outputStream);
IOUtils.closeQuietly(fsDataInputStream);
}
IOUtils.closeQuietly(outputStream);
fileSystem.close();
localFileSystem.close();
}
* 遞歸遍歷hdfs中所有的文件路徑
*/
@Test
public void getAllHdfsFilePath() throws URISyntaxException, IOException {
//獲取fs的客戶端
FileSystem fileSystem = FileSystem.get(new URI("hdfs://node01:8020"), new Configuration());
Path path = new Path("/");
FileStatus[] fileStatuses = fileSystem.listStatus(path);
//循環遍歷fileStatuses,如果是文件,打印文件的路徑,如果是文件夾,繼續遞歸進去
for (FileStatus fileStatus : fileStatuses){
if (fileStatus.isDirectory()){//文件夾
getDirectoryFiles(fileSystem,fileStatus);
}else{ //文件
System.out.println(fileStatus.getPath().toString());
}
}
//方法二:
System.out.println("方法二:利用官方提供API");
RemoteIterator<LocatedFileStatus> locatedFileStatusRemoteIterator = fileSystem.listFiles(new Path("/"), true);
while (locatedFileStatusRemoteIterator.hasNext()){
LocatedFileStatus next = locatedFileStatusRemoteIterator.next();
System.out.println(next.getPath());
}
//關閉fs的客戶端
fileSystem.close();
}
/**
* 遞歸獲取文件路徑
*/
public void getDirectoryFiles(FileSystem fileSystem,FileStatus fileStatus) throws IOException {
//通過fileStatus獲取文件夾路徑
Path path = fileStatus.getPath(); //該fileStatus必定為一個文件夾
FileStatus[] fileStatuses = fileSystem.listStatus(path);
for (FileStatus status:fileStatuses){
if (fileStatus.isDirectory()){
getDirectoryFiles(fileSystem,status);
}else{
System.out.println(fileStatus.getPath().toString());
}
}
}
/**
* 下載hdfs文件到本地
*/
@Test
public void copyHdfsToLocal() throws Exception {
FileSystem fileSystem = FileSystem.get(new URI("hdfs://node01:8020"), new Configuration());
FSDataInputStream inputStream = fileSystem.open(new Path("hdfs://node01:8020/aa/haha2.txt"));
FileOutputStream outputStream = new FileOutputStream(new File("d:\\install-log.txt"));
IOUtils.copy(inputStream,outputStream);
IOUtils.closeQuietly(inputStream);
IOUtils.closeQuietly(outputStream);
//方法二:利用官方API
//有報錯:java.lang.UnsatisfiedLinkError: org.apache.hadoop.io.nativeio.NativeIO$Windows.createFileWithMode0(Ljava/lang/String;JJJI)Ljava/io/FileDescriptor;
fileSystem.copyToLocalFile(new Path("hdfs://node01:8020/aa/haha2.txt"),new Path("file:///d:\\install-log2.txt"));
fileSystem.close();
}
/**
* hdfs上面創建文件夾
*/
@Test
public void createHdfsDir() throws Exception{
FileSystem fileSystem = FileSystem.get(new URI("hdfs://node01:8020"), new Configuration());
fileSystem.mkdirs(new Path("/aa/bb/cc/"));
fileSystem.close();
}
/**
* hdfs的文件上傳
*/
@Test
public void uploadFileToHdfs() throws Exception{
FileSystem fileSystem = FileSystem.get(new URI("hdfs://node01:8020"), new Configuration());
//注:new Path()中的字符串參數如果省略file:///或hdfs://的話,默認會在參數前添加hdfs://node01:8020,即,默認是hdfs路徑
fileSystem.copyFromLocalFile(false,new Path("file:///d:\\output.txt"),new Path("/aa/bb/cc"));
//第二種方法:通過流的方式
//輸出流,負責將數據輸出到hdfs的路徑上面去
FSDataOutputStream outputStream = fileSystem.create(new Path("/aa/bb/cc/empSel.hdfs"));
//通過輸入流讀取本地文件系統的文件
InputStream inputStream = new FileInputStream(new File("d:\\empSel.txt"));
IOUtils.copy(inputStream,outputStream);
IOUtils.closeQuietly(inputStream);
IOUtils.closeQuietly(outputStream);
fileSystem.close();
}
/**
* hdfs的權限校驗機制
*/
@Test
public void hdfsPermission() throws Exception{
/*
在所有節點的hdfs-site.xml中設置開啟權限驗證:
<property>
<name>dfs.permissions</name>
<value>true</value>
</property>
普通的filesystem,執行時會報錯:org.apache.hadoop.security.AccessControlException:
Permission denied: user=Administrator, access=READ, inode="/config/core-site.xml":root:supergroup:-rw-------
FileSystem fileSystem = FileSystem.get(new URI("hdfs://node01:8020"), new Configuration());
*/
//通過偽造用戶來獲取分布式文件系統的客戶端
FileSystem fileSystem = FileSystem.get(new URI("hdfs://node01:8020"), new Configuration(), "root");
//從hdfs上下載文件到本地
FSDataInputStream inputStream = fileSystem.open(new Path("/config/core-site.xml"));
FileOutputStream outputStream = new FileOutputStream(new File("d:\\core-site.txt"));
IOUtils.copy(inputStream,outputStream);
IOUtils.closeQuietly(inputStream);
IOUtils.closeQuietly(outputStream);
// fileSystem.copyFromLocalFile(new Path("file:///d:\\transferIndex.txt"),new Path("/aa/bb/cc/"));
// fileSystem.delete(new Path("/aa/bb/cc/"),false);
fileSystem.close();
}
/**
* hdfs在上傳小文件的時候進行合并
* 在我們的hdfs 的shell命令模式下,可以通過命令行將很多的hdfs文件合并成一個大文件下載到本地:
* hdfs dfs -getmerge /config/*.xml ./hello.xml
* 上傳時也能將小文件合并到一個大文件里面去。
*/
@Test
public void mergeFile()throws Exception{
//獲取分布式文件系統
FileSystem fileSystem = FileSystem.get(new URI("hdfs://192.168.8.100:8020"), new Configuration(),"root");
FSDataOutputStream outputStream = fileSystem.create(new Path("/bigFile.xml"));
//獲取本地所有小文件的輸入流
//首先獲取本地文件系統
LocalFileSystem localFileSystem = FileSystem.getLocal(new Configuration());
FileStatus[] fileStatuses = localFileSystem.listStatus(new Path("file:///D:\\上傳小文件合并"));
for (FileStatus fileStatus:fileStatuses){
Path path = fileStatus.getPath();
FSDataInputStream fsDataInputStream = localFileSystem.open(path);
IOUtils.copy(fsDataInputStream,outputStream);
IOUtils.closeQuietly(fsDataInputStream);
}
IOUtils.closeQuietly(outputStream);
fileSystem.close();
localFileSystem.close();
}
轉載于:https://www.cnblogs.com/mediocreWorld/p/10952959.html
總結
以上是生活随笔為你收集整理的hdfs深入:10、hdfs的javaAPI操作的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: JSON.parse使用 之 Unexp
- 下一篇: 机器翻译 - 日期翻译