Java hdfs连接池_Java使用连接池管理Hdfs连接
記錄一下Java API 連接hadoop操作hdfs的實現流程(使用連接池管理)。
以前做過這方面的開發,本來以為不會有什么問題,但是做的還是坑坑巴巴,內心有些懊惱,記錄下這煩人的過程,警示自己切莫眼高手低!
一:引入相關jar包如下
org.apache.hadoop
hadoop-common
2.8.2
org.apache.hadoop
hadoop-hdfs
2.8.2
org.apache.commons
commons-pool2
2.6.0
二:連接池開發的基本流程
2.1項目基本環境是SpringBoot大集成···
2.2hadoop相關包結構如下(自己感覺這結構劃分的也是凸顯了low逼水平【手動笑哭】)
2.2 畫個圖表達下開發思路
三、上代碼
importcom.cmcc.datacenter.hdfs.client.HdfsClient;importcom.cmcc.datacenter.hdfs.client.HdfsFactory;importorg.springframework.beans.factory.annotation.Value;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;
@Configurationpublic classHdfsConfig {
@Value("${hadoop.hdfs.ip}")privateString hdfsServerIp;
@Value("${hadoop.hdfs.port}")privateString hdfsServerPort;
@Value("${hadoop.hdfs.pool.maxTotal}")private intmaxTotal;
@Value("${hadoop.hdfs.pool.maxIdle}")private intmaxIdle;
@Value("${hadoop.hdfs.pool.minIdle}")private intminIdle;
@Value("${hadoop.hdfs.pool.maxWaitMillis}")private intmaxWaitMillis;
@Value("${hadoop.hdfs.pool.testWhileIdle}")private booleantestWhileIdle;
@Value("${hadoop.hdfs.pool.minEvictableIdleTimeMillis}")private long minEvictableIdleTimeMillis = 60000;
@Value("${hadoop.hdfs.pool.timeBetweenEvictionRunsMillis}")private long timeBetweenEvictionRunsMillis = 30000;
@Value("${hadoop.hdfs.pool.numTestsPerEvictionRun}")private int numTestsPerEvictionRun = -1;
@Bean(initMethod= "init", destroyMethod = "stop")publicHdfsClient HdfsClient(){
HdfsClient client= newHdfsClient();returnclient;
}/*** TestWhileConfig - 在空閑時檢查有效性, 默認false
* MinEvictableIdleTimeMillis - 逐出連接的最小空閑時間
* TimeBetweenEvictionRunsMillis - 逐出掃描的時間間隔(毫秒) 如果為負數則不運行逐出線程,默認-1
* NumTestsPerEvictionRun - 每次逐出檢查時 逐出的最大數目
**/@BeanpublicHdfsPoolConfig HdfsPoolConfig(){
HdfsPoolConfig hdfsPoolConfig= newHdfsPoolConfig();
hdfsPoolConfig.setTestWhileIdle(testWhileIdle);
hdfsPoolConfig.setMinEvictableIdleTimeMillis(minEvictableIdleTimeMillis);
hdfsPoolConfig.setTimeBetweenEvictionRunsMillis(timeBetweenEvictionRunsMillis);
hdfsPoolConfig.setNumTestsPerEvictionRun(numTestsPerEvictionRun);
hdfsPoolConfig.setMaxTotal(maxTotal);
hdfsPoolConfig.setMaxIdle(maxIdle);
hdfsPoolConfig.setMinIdle(minIdle);
hdfsPoolConfig.setMaxWaitMillis(maxWaitMillis);returnhdfsPoolConfig;
}
@BeanpublicHdfsFactory HdfsFactory(){return new HdfsFactory("hdfs://" + hdfsServerIp + ":" +hdfsServerPort);
}
}
importorg.apache.commons.pool2.impl.GenericObjectPoolConfig;public class HdfsPoolConfig extendsGenericObjectPoolConfig {publicHdfsPoolConfig(){}/*** TestWhileConfig - 在空閑時檢查有效性, 默認false
* MinEvictableIdleTimeMillis - 逐出連接的最小空閑時間
* TimeBetweenEvictionRunsMillis - 逐出掃描的時間間隔(毫秒) 如果為負數則不運行逐出線程,默認-1
* NumTestsPerEvictionRun - 每次逐出檢查時 逐出的最大數目
**/
public HdfsPoolConfig(boolean testWhileIdle, long minEvictableIdleTimeMillis, long timeBetweenEvictionRunsMillis, intnumTestsPerEvictionRun){this.setTestWhileIdle(testWhileIdle);this.setMinEvictableIdleTimeMillis(minEvictableIdleTimeMillis);this.setTimeBetweenEvictionRunsMillis(timeBetweenEvictionRunsMillis);this.setNumTestsPerEvictionRun(numTestsPerEvictionRun);
}
}
packagecom.cmcc.datacenter.hdfs.client;importcom.cmcc.datacenter.hdfs.config.HdfsPoolConfig;importorg.slf4j.Logger;importorg.slf4j.LoggerFactory;importorg.springframework.beans.factory.annotation.Autowired;importjava.util.List;public classHdfsClient {private Logger logger = LoggerFactory.getLogger(this.getClass());privateHdfsPool hdfsPool;
@AutowiredprivateHdfsPoolConfig hdfsPoolConfig;
@AutowiredprivateHdfsFactory hdfsFactory;public voidinit(){
hdfsPool= newHdfsPool(hdfsFactory,hdfsPoolConfig);
}public voidstop(){
hdfsPool.close();
}public long getPathSize(String path) throwsException {
Hdfs hdfs= null;try{
hdfs=hdfsPool.borrowObject();returnhdfs.getContentSummary(path).getLength();
}catch(Exception e) {
logger.error("[HDFS]獲取路徑大小失敗", e);throwe;
}finally{if (null !=hdfs) {
hdfsPool.returnObject(hdfs);
}
}
}public ListgetBasePath(){
Hdfs hdfs= null;try{
hdfs=hdfsPool.borrowObject();returnhdfs.listFileName();
}catch(Exception e) {
e.printStackTrace();return null;
}finally{if (null !=hdfs) {
hdfsPool.returnObject(hdfs);
}
}
}
}
importorg.apache.commons.pool2.PooledObject;importorg.apache.commons.pool2.PooledObjectFactory;importorg.apache.commons.pool2.impl.DefaultPooledObject;importjava.io.IOException;public class HdfsFactory implements PooledObjectFactory{private finalString url;publicHdfsFactory(String url){this.url =url;
}
@Overridepublic PooledObject makeObject() throwsException {
Hdfs hdfs= newHdfs(url);
hdfs.open();return new DefaultPooledObject(hdfs);
}
@Overridepublic void destroyObject(PooledObject pooledObject) throwsException {
Hdfs hdfs=pooledObject.getObject();
hdfs.close();
}
@Overridepublic boolean validateObject(PooledObjectpooledObject) {
Hdfs hdfs=pooledObject.getObject();try{returnhdfs.isConnected();
}catch(IOException e) {
e.printStackTrace();return false;
}
}
@Overridepublic void activateObject(PooledObject pooledObject) throwsException {
}
@Overridepublic void passivateObject(PooledObject pooledObject) throwsException {
}
}
packagecom.cmcc.datacenter.hdfs.client;importorg.apache.commons.pool2.PooledObjectFactory;importorg.apache.commons.pool2.impl.AbandonedConfig;importorg.apache.commons.pool2.impl.GenericObjectPool;importorg.apache.commons.pool2.impl.GenericObjectPoolConfig;public class HdfsPool extends GenericObjectPool{public HdfsPool(PooledObjectFactoryfactory) {super(factory);
}public HdfsPool(PooledObjectFactory factory, GenericObjectPoolConfigconfig) {super(factory, config);
}public HdfsPool(PooledObjectFactory factory, GenericObjectPoolConfigconfig, AbandonedConfig abandonedConfig) {super(factory, config, abandonedConfig);
}
}
importcom.cmcc.datacenter.hdfs.config.HdfsConfig;importcom.google.common.collect.Lists;importorg.apache.hadoop.conf.Configuration;importorg.apache.hadoop.fs.ContentSummary;importorg.apache.hadoop.fs.FileStatus;importorg.apache.hadoop.fs.FileSystem;importorg.apache.hadoop.fs.Path;importorg.slf4j.Logger;importorg.slf4j.LoggerFactory;importorg.springframework.beans.factory.annotation.Autowired;importjava.io.IOException;importjava.util.List;public classHdfs {private Logger logger = LoggerFactory.getLogger(this.getClass());privateFileSystem fs;privateString coreResource;privateString hdfsResource;private finalString url;private static final String NAME = "fs.hdfs.impl";publicHdfs(String url) {this.url =url;
}public voidopen() {try{
Configuration conf= newConfiguration();
conf.set("fs.defaultFS", url);
System.out.println("url is "+url);
fs=FileSystem.get(conf);
logger.info("[Hadoop]創建實例成功");
}catch(Exception e) {
logger.error("[Hadoop]創建實例失敗", e);
}
}public voidclose() {try{if (null !=fs) {
fs.close();
logger.info("[Hadoop]關閉實例成功");
}
}catch(Exception e) {
logger.error("[Hadoop]關閉實例失敗", e);
}
}public boolean isConnected() throwsIOException {return fs.exists(new Path("/"));
}public boolean exists(String path) throwsIOException {
Path hdfsPath= newPath(path);returnfs.exists(hdfsPath);
}public FileStatus getFileStatus(String path) throwsIOException {
Path hdfsPath= newPath(path);returnfs.getFileStatus(hdfsPath);
}public ContentSummary getContentSummary(String path) throwsIOException {
ContentSummary contentSummary= null;
Path hdfsPath= newPath(path);if(fs.exists(hdfsPath)) {
contentSummary=fs.getContentSummary(hdfsPath);
}returncontentSummary;
}public List listFileName() throwsIOException {
List res =Lists.newArrayList();
FileStatus[] fileStatuses= fs.listStatus(new Path("/"));for(FileStatus fileStatus : fileStatuses){
res.add(fileStatus.getPath()+":類型--"+ (fileStatus.isDirectory()? "文件夾":"文件"));
}returnres;
}
}
四、總結:
一共六個類,理清思路看是很easy的。
這里就是spring對類的管理和commons-pool2對連接類的管理混著用了,所以顯得有點亂。
1.@Configuration注解加到Hdfsconfig類上,作為一個配置類,作用類似于spring-xml文件中的標簽,springboot會掃描并注入它名下管理的類,其中
@Bean(initMethod = "init", destroyMethod = "stop") 標簽表示spring在初始化這個類時調用他的init方法,銷毀時調用他的stop方法。
2.HdfsClient 是業務方法調用的類,spring在初始化這個類時,調用它的init方法,這個方法會創建HdfsPool(即Hdfs的連接池)。其他方法是對Hdfs中方法的二次封裝,即先使用連接池獲取實例,再調用實例方法。
3.HdfsPoolConfig繼承commons-pool2包中的GenericObjectConfig,受spring管理,作為線程池的配置類,創建HdfsPool時作為參數傳入。
4.HdfsFactory繼承commons-pool2包中的GenericObjectFactory,受spring管理,作為創建連接實例的工廠類,創建HdfsPool時作為參數傳入。實際上連接池就是通過它獲取的連接實例。
5.HdfsPool繼承commons-pool2包中的GenericObjectPool,是連接池。
6.Hdfs,是底層的連接實例,所有增刪改查的方法都要在這里實現,只不過獲取/銷毀連接交給池管理。
聲明:這里用spring管理一些類是應為項目本身用的springboot,spring管理方便,并不是強制使用,愿意完全可以自己new。
五、不得不說的一些不是坑的坑。
1.我真的不記得windows上用Java API連接遠程的hadoop還要有一些神操作。
報錯如下:java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset
解決如下:
1. 將已下載的?hadoop-2.9.0.tar 這個壓縮文件解壓,放到你想要的位置(本機任意位置);
2. 下載 windows 環境下所需的其他文件(hadoop2.9.0對應的hadoop.dll,winutils.exe 等),這步真是關鍵,吐槽某SDN想錢想瘋了啊,霸占百度前10頁,各種下載各種C幣,各種要錢。
不多說了,附上github地址:github地址
3. 拿到上面下載的windows所需文件,執行以下步驟:
3.1:將文件解壓到你解壓的 hadoop-2.9.0.tar 的bin目錄下(沒有的放進去,有的不要替換,以免花式作死,想學習嘗試的除外)
3.2:將hadoop.dll復制到C:\Window\System32下
3.3:添加環境變量HADOOP_HOME,指向hadoop目錄
3.4:將%HADOOP_HOME%\bin加入到path里面,不管用的話將%HADOOP_HOME%\sbin也加進去。
3.5:重啟 IDE(你的編輯工具,例如eclipse,intellij idea)
原文:https://www.cnblogs.com/peripateticism/p/10895903.html
總結
以上是生活随笔為你收集整理的Java hdfs连接池_Java使用连接池管理Hdfs连接的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: python 格式化字符串_Python
- 下一篇: redis查询所有key命令_三歪推荐: