HDFS的API调用,创建Maven工程,创建一个非Maven工程,HDFS客户端操作数据代码示例,文件方式操作和流式操作
1. HDFS的java操作
hdfs在生產應用中主要是客戶端的開發,其核心步驟是從hdfs提供的api中構造一個HDFS的訪問客戶端對象,然后通過該客戶端對象操作(增刪改查)HDFS上的文件
1.1 搭建開發環境??????????????????
1.1.1創建Maven工程
快速創建一個Maven工程和目錄結構的方式是執行下面的命令:
mvn archetype:generate -DgroupId=com.toto.hadoop -DartifactId=hadoop -DarchetypeArtifactId=maven-archetype-quickstart -DinteractiveMode=false |
接著執行下面的命令: mvn -Pall eclipse:eclipse mvn clean mvn compile -Dmaven.test.skip=true mvn install -Dmaven.test.skip=true mvn package -Dmaven.test.skip=true |
最后將工程導入到Eclipse中: |
?
1、配置pom文件,引入下面的jar之后,其它相關的maven依賴的jar包,它會自動引入:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" ? xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> ? <modelVersion>4.0.0</modelVersion> ? <groupId>com.toto.hadoop</groupId> ? <artifactId>hadoop</artifactId> ? <packaging>jar</packaging> ? <version>1.0-SNAPSHOT</version> ? <name>hadoop</name> ? <url>http://maven.apache.org</url> ? <dependencies> ??????? <dependency>? ??????????? <groupId>org.apache.hadoop</groupId>? ??????????? <artifactId>hadoop-common</artifactId>? ??????????? <version>2.5.1</version>? ??????? </dependency>? ??????? <dependency>? ??????????? <groupId>org.apache.hadoop</groupId>? ??????????? <artifactId>hadoop-hdfs</artifactId>? ??????????? <version>2.5.1</version>? ??????? </dependency>? ??????? <dependency>? ??????????? <groupId>org.apache.hadoop</groupId>? ??????????? <artifactId>hadoop-client</artifactId>? ??????????? <version>2.5.1</version>? ??????? </dependency> ??????? ??????? <dependency> ?????? ??? <groupId>junit</groupId> ?????? ??? <artifactId>junit</artifactId> ?????? ??? <version>4.12</version> ?????? ??? <scope>test</scope> ?????? </dependency> ? </dependencies> </project> |
?
1.1.2創建一個非Maven工程
注:如需手動引入jar包,hdfs的jar包----hadoop的安裝目錄的share下,他們分別是:
hadoop-2.8.0\share\hadoop\common\hadoop-common-2.8.0.jar hadoop-2.8.0\share\hadoop\common\lib hadoop-2.8.0\share\hadoop\hdfs\hadoop-hdfs-2.8.0.jar hadoop-2.8.0\share\hadoop\hdfs\lib |
如果是創建非Maven工程,步驟如下:
第一步:創建一個User Library
第二步:添加library
第三步:添加common中的所有的jar
第四步:添加hadoop-hdfs-2.8.0.jar
第五步:添加hdfs依賴的jar
最后到工程里面查看引包情況
接著上面的配置,可以進行hdfs客戶端調用的代碼的編寫了。
?
2、window下開發的說明
建議在linux下進行hadoop應用的開發,不會存在兼容性問題。如在window上做客戶端應用開發,需要設置以下環境:
A、用老師給的windows平臺下編譯的hadoop安裝包解壓一份到windows的任意一個目錄下
B、在window系統中配置HADOOP_HOME指向你解壓的安裝包目錄
C、在windows系統的path變量中加入HADOOP_HOME的bin目錄
1.2 獲取api中的客戶端對象
在java中操作hdfs,首先要獲得一個客戶端實例
Configuration conf = new Configuration() FileSystem fs = FileSystem.get(conf) |
?
而我們的操作目標是HDFS,所以獲取到的fs對象應該是DistributedFileSystem的實例;
get方法是從何處判斷具體實例化那種客戶端類呢?
——從conf中的一個參數 fs.defaultFS的配置值判斷;
?
如果我們的代碼中沒有指定fs.defaultFS,并且工程classpath下也沒有給定相應的配置,conf中的默認值就來自于hadoop的jar包中的core-default.xml,默認值為: file:///,則獲取的將不是一個DistributedFileSystem的實例,而是一個本地文件系統的客戶端對象
?
1.3 DistributedFileSystem實例對象所具備的方法
1.4 HDFS客戶端操作數據代碼示例:
1.4.1 文件的增刪改查
package com.toto.hadooptest; ? import java.net.URI; ? import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; import org.junit.Before; import org.junit.Test; ? public class HdfsClient { ? ??? FileSystem fs = null; ??? ??? @Before ??? public void init() throws Exception { ??????? // 構造一個配置參數對象,設置一個參數:我們要訪問的hdfs的URI ??????? // 從而FileSystem.get()方法就知道應該是去構造一個訪問hdfs文件系統的客戶端,以及hdfs的訪問地址 ??????? // new Configuration();的時候,它就會去加載jar包中的hdfs-default.xml ??????? // 然后再加載classpath下的hdfs-site.xml ??????? Configuration conf = new Configuration(); ??????? conf.set("df.defaultFS", "hdfs://hadoop:9000"); ??????? /** ??????? ?* 參數優先級:1、客戶端代碼中設置的值 2、classpath下的用戶自定義配置文件 3、然后是服務器的默認配置 ??????? ?*/ ??????? conf.set("dfs.replication", "3"); ??????? ??????? // 獲取一個hdfs的訪問客戶端,根據參數,這個實例應該是DistributedFileSystem的實例 ??????? // fs = FileSystem.get(conf); ??????? ??????? //如果這樣去獲取,那conf里面就可以不要配"fs.defaultFS"參數,而且,這個客戶端的身份識別已經是hadoop用戶 ??????? //注意,最后的toto表示的是hadoop集群安裝的Linux用戶 ??????? fs = FileSystem.get(new URI("hdfs://hadoop:9000"),conf,"toto"); ??? } ??? ??? /** ??? ?* \brief hdfs上傳文件 ??? ?* ??? ?* 執行完成之后,進入Linux,執行一下命令,發現會出現以下列表 ??? ?* [toto@hadoop software]$ hadoop fs -ls /toto2 ?????? Found 1 items ?????? -rw-r--r--?? 3 toto supergroup??? 2286088 2017-05-30 13:25 /toto2/ik-analyzer-solr5-master.zip ??? ?*/ ??? @Test ??? public void testAddFileToHdfs() { ??????? try { ??????????? //要上傳的所在的本地路徑 ??????????? Path src = new Path("d:/ik-analyzer-solr5-master.zip"); ??????????? //要上傳的hdfs的目標路徑 ??????????? Path dst = new Path("/toto2"); ??????????? fs.copyFromLocalFile(src, dst); ??????????? fs.close(); ??????? } catch (Exception e) { ??????????? e.printStackTrace(); ??????? } ??? } ??? ??? /** ??? ?* \brief 從hdfs中復制文件到本地文件系統 ??? ?* @attention執行完成之后,進入E:/learnTempFolder這個文件夾,發現在改文件夾下已經有了自己拷貝下來的文件了 ??? ?* @author toto ??? ?* @date 2017年5月30日 ??? ?* @note? begin modify by 涂作權 2017年5月30日?? 原始創建 ??? ?*/ ??? @Test ??? public void testDownLoadFileToLocal() { ??????? try { ??????????? fs.copyToLocalFile(new Path("/toto2/findbugs-1.3.9"), new Path("E:/learnTempFolder")); ??????????? fs.close(); ??????? } catch (Exception e) { ??????????? e.printStackTrace(); ??????? } ??? } ??? ??? /** ??? ?* 創建目錄,刪除目錄,重命名 ??? ?* 執行之前hadoop的文件系統的目錄結構如下: ??? ?* [toto@hadoop sbin]$ hadoop fs -ls / ??????? Found 7 items ??????? drwxr-xr-x?? - toto supergroup????????? 0 2017-05-30 14:06 /aaa ??????? drwxr-xr-x?? - toto supergroup????????? 0 2017-05-29 14:01 /findbugs-1.3.9 ??????? drwxr-xr-x?? - toto supergroup????????? 0 2017-05-29 03:23 /hive ??????? drwx------?? - toto supergroup????????? 0 2017-05-29 14:47 /tmp ??????? drwxr-xr-x?? - toto supergroup????????? 0 2017-05-29 23:59 /toto ??????? drwxr-xr-x?? - toto supergroup????????? 0 2017-05-30 13:25 /toto2 ??????? drwxr-xr-x?? - toto supergroup????????? 0 2017-05-30 00:18 /user ??????? [toto@hadoop sbin]$ ??????? ??????? 執行之后的效果: ??????? [toto@hadoop sbin]$ hadoop fs -ls / ??????? Found 7 items ??????? drwxr-xr-x?? - toto supergroup????????? 0 2017-05-30 14:11 /a2 ??????? drwxr-xr-x?? - toto supergroup????????? 0 2017-05-29 14:01 /findbugs-1.3.9 ??????? drwxr-xr-x?? - toto supergroup????????? 0 2017-05-29 03:23 /hive ??????? drwx------?? - toto supergroup????????? 0 2017-05-29 14:47 /tmp ??????? drwxr-xr-x?? - toto supergroup????????? 0 2017-05-29 23:59 /toto ??????? drwxr-xr-x?? - toto supergroup????????? 0 2017-05-30 13:25 /toto2 ??????? drwxr-xr-x?? - toto supergroup????????? 0 2017-05-30 00:18 /user ??????? ??????? 總結: ??????? 1、發現/aaa文件夾被刪除了 ??????? 2、發現出現了/a2,但是最開始沒有,說明是創建的a1,然后又被改成的a2 ??? ?*/ ??? @Test ??? public void testMkdirDeleteAndRename() { ??????? try { ??????????? //創建目錄 ??????????? fs.mkdirs(new Path("/a1/b1/c1")); ??????????? ??????????? //刪除文件夾,如果是非空文件夾,參數2必須給值true ??????????? fs.delete(new Path("/aaa"),true); ??????????? ??????????? //重命名文件或文件夾 ??????????? fs.rename(new Path("/a1"), new Path("/a2")); ??????? } catch(Exception e) { ??? ??????? e.printStackTrace(); ??????? } ??? } ??? ??? /** ??? ?* \brief 查看目錄信息,只顯示文件? ??? ?*/ ??? @Test ??? public void testListFiles() { ??????? try { ??????????? //思考:為什么返回迭代器,而不是List之類的容器,這里的只有調用hasNext()的時候,才會返回實際的數據信息 ??????????? RemoteIterator<LocatedFileStatus> listFiles = fs.listFiles(new Path("/"), true); ??????????? ??????????? while(listFiles.hasNext()) { ??????????????? LocatedFileStatus fileStatus = listFiles.next(); ??????????????? System.out.println(fileStatus.getPath().getName()); ??????????????? System.out.println(fileStatus.getBlockSize()); ??????????????? System.out.println(fileStatus.getPermission()); ??? ??????????? System.out.println(fileStatus.getLen()); ??????????????? BlockLocation[] blockLocations = fileStatus.getBlockLocations(); ??????????????? for(BlockLocation b1 : blockLocations) { ??????????????????? System.out.println("block-length:" + b1.getLength() + "--" + "block-offset:" + b1.getOffset()); ??????????????????? String[] hosts = b1.getHosts(); ??????????????????? for(String host : hosts) { ??????????????????????? System.out.println(host); ??????????????????? } ??????????????? } ??????????????? System.out.println("-----------------------------------"); ??????????? } ??????? } catch (Exception e) { ??????????? e.printStackTrace(); ??????? } ??? } ??? ??? /** ??? ?* 查看文件及文件夾信息 ??? ?* ??? ?* 顯示的內容如下: ??? ?* d--???????????? a2 ??? ?* d--???????????? findbugs-1.3.9 ??? ?* d--???????????? hive ??? ?* d--???????????? tmp ??? ?* d--???????????? toto ??? ?* d--???????????? toto2 ??? ?* d--???????????? user ??? ?*/ ??? @Test ??? public void testListAll() { ??????? try { ??????????? FileStatus[] listStatus = fs.listStatus(new Path("/")); ??????????? ??????????? String flag = "d--???????????? "; ??????????? for(FileStatus fstatus : listStatus) { ??????????????? if (fstatus.isFile())? flag = "f--???????? "; ??????????????? System.out.println(flag + fstatus.getPath().getName()); ??????????? } ??????? } catch (Exception e) { ??????????? e.printStackTrace(); ??????? } ??? } } |
?
1.4.2 通過流的方式訪問hdfs
package com.toto.hadooptest; ? import java.io.File; import java.io.FileInputStream; import java.io.FileOutputStream; import java.net.URI; ? import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; import org.junit.Before; import org.junit.Test; ? /** ?* 相對那些封裝好的方法而言的更底層一些的操作方式 ?* 上層那些mapreduce,spark等運算框架,去hdfs中獲取數據的時候,就是調用這種底層的api ?*/ public class StreamAccess { ??? FileSystem fs = null; ? ??? @Before ??? public void init() throws Exception { ? ?????? Configuration conf = new Configuration(); ?????? fs = FileSystem.get(new URI("hdfs://hadoop:9000"), conf, "toto"); ? ??? } ??? ?????? /** ??? ?* 通過流的方式上傳文件到hdfs ??? ?* @throws Exception ??? ?*/ ??? @Test ??? public void testUpload() throws Exception { ?????? FSDataOutputStream outputStream = fs.create(new Path("/README.md"), true); ?????? FileInputStream inputStream = new FileInputStream("E:/learnTempFolder/dubbo-master/README.md"); ?????? ?????? //注意這里的IOUtils是org.apache.hadoop.io.IOUtils中的包,否則創建不成功 ?????? IOUtils.copyBytes(inputStream, outputStream, 4096); ??? } ? ??? /** ??? ?* 從hdfs上下載資源文件到本地 ??? ?*/ ??? @Test ??? public void testDownLoadFileToLocal() { ?????? try{ ?????????? //先獲取一個文件的輸入流---針對hdfs上的 ?????????? FSDataInputStream in = fs.open(new Path("/hive/apache-hive-2.0.0-bin.tar.gz")); ?????????? ?????????? //再構造一個文件的輸出流---針對本地的 ?????????? FileOutputStream out = new FileOutputStream(new File("E:/apache-hive-2.0.0-bin.tar.gz")); ?????????? ?????????? //再將輸入流中數據傳輸到輸出流 ?????????? IOUtils.copyBytes(in, out, 4096); ?????? } catch (Exception e) { ?????????? e.printStackTrace(); ?????? } ??? } ??? ??? /** ??? ?* hdfs支持隨機定位進行文件讀取,而且可以方便地讀取指定長度 ??? ?* 用于上層分布式運算框架并發處理數據 ??? ?*/ ??? @Test ??? public void testRandomAccess() { ?????? try { ?????????? //先獲取一個文件的輸入流---針對hdfs上的 ?????????? FSDataInputStream in = fs.open(new Path("/pom.xml")); ?????????? ?????????? //可以將流的起始偏移量進行自定義 ??? ?????? in.seek(200); ?????????? ?????????? //在構造一個文件的輸出流----針對本地的 ?????????? FileOutputStream out = new FileOutputStream(new File("E:/pom.xml")); ?????????? ?????????? //第三個參數的意思是獲取長度為3000的文本內容 ?????????? IOUtils.copyBytes(in, out,3000L, true); ?????? } catch (Exception e) { ?????????? e.printStackTrace(); ?????? } ??? } ??? ??? /** ??? ?* 顯示hdfs上文件的內容 ??? ?*/ ??? @Test ??? public void testCat() { ?????? try { ?????????? FSDataInputStream in = fs.open(new Path("/pom.xml")); ?????????? ?????????? IOUtils.copyBytes(in, System.out, 1024); ?????? } catch (Exception e) { ?????????? e.printStackTrace(); ?????? } ??? } ??? } |
?
1.4.3 場景編程
在mapreduce 、spark等運算框架中,有一個核心思想就是將運算移往數據,或者說,就是要在并發計算中盡可能讓運算本地化,這就需要獲取數據所在位置的信息并進行相應范圍讀取以下模擬實現:獲取一個文件的所有block位置信息,然后讀取指定block中的內容
???????? @Test ??? public void testCat2() { ?????? try { ?????????? FSDataInputStream in = fs.open(new Path("/toto2/findbugs-1.3.9")); ?????????? //拿到文件信息 ?????????? FileStatus[] listStatus = fs.listStatus(new Path("/toto2/findbugs-1.3.9")); ?????????? //獲取這個文件的所有的block的信息 ?????????? BlockLocation[] fileBlockLocations = fs.getFileBlockLocations(listStatus[0], 0, listStatus[0].getLen()); ?????????? //第一個block的長度 ?????????? long length = fileBlockLocations[0].getLength(); ?????????? //第一個block的起始偏移量 ?????????? long offset = fileBlockLocations[0].getOffset(); ?????????? ?????????? System.out.println(length); ?????????? System.out.println(offset); ?????????? ?????????? //獲取第一個block寫入輸出流 ?????????? IOUtils.copyBytes(in, System.out, (int)length); ?????????? byte[] b = new byte[4096]; ?????????? ?????????? @SuppressWarnings("resource") ?????????? FileOutputStream os = new FileOutputStream(new File("e:/block")); ?????????? while(in.read(offset,b,0,4096) != -1) { ????????????? os.write(b); ????????????? offset += 4096; ????????????? if (offset >= length) return; ?????????? } ?????????? ?????????? os.flush(); ?????????? os.close(); ?????????? in.close(); ?????? }? catch (Exception e) { ?????????? e.printStackTrace(); ?????? } ??? } |
?
?
?
?
與50位技術專家面對面20年技術見證,附贈技術全景圖總結
以上是生活随笔為你收集整理的HDFS的API调用,创建Maven工程,创建一个非Maven工程,HDFS客户端操作数据代码示例,文件方式操作和流式操作的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 华创证券是上市公司吗
- 下一篇: HDFS的工作机制,HDFS写数据流程,