javascript
Spring boot项目整合Hadoop的HDFS
由于最近需要使用Spring boot整合Hadoop的HDFS,但是在整合的過程遇到了很多問題,網上也沒有現成教程,都是自己摸索出來的,配置了很久都沒能把項目搭建出來,希望對大家有幫助。
使用Spring boot整合HDFS主要是為了從數據庫獲取List,將List數據生產CSV文件,導入到HDFS進行機器學習。
本文主要講解如何整合成功和如果將List數據變成CSV文件存進HDFS當中。
簡單整理下會出現的問題:
1.使用過程使用了@Slf4j,但是使用了Hadoop自動會導入log4j,會出現日志沖突
2.整合后,會出現tomcat無法啟動的問題
3.依賴經常沒法下載完成(這個我不斷地重復下載,就解決了)
下面我先放上Pom.xml文件,這個文件比較重要,主要解決整合也是Pom.xml文件
參考如下:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
?? ?xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
?? ?xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
?? ?<modelVersion>4.0.0</modelVersion>
?
?? ?<groupId>com.ratings</groupId>
?? ?<artifactId>ratings</artifactId>
?? ?<version>0.0.1-SNAPSHOT</version>
?? ?<packaging>war</packaging>
?
?? ?<name>ratings</name>
?? ?<description>ratings</description>
?
?? ?<parent>
?? ??? ?<groupId>org.springframework.boot</groupId>
?? ??? ?<artifactId>spring-boot-starter-parent</artifactId>
?? ??? ?<version>2.0.3.RELEASE</version>
?? ??? ?<relativePath /> <!-- lookup parent from repository -->
?? ?</parent>
?
?? ?<properties>
?? ??? ?<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
?? ??? ?<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
?? ??? ?<java.version>1.8</java.version>
?
?? ?</properties>
?
?? ?<dependencies>
?? ??? ?<!--<tomcat.version>8.0.9</tomcat.version> <dependency> <groupId>org.apache.tomcat</groupId>?
?? ??? ??? ?<artifactId>tomcat-juli</artifactId> <version>${tomcat.version}</version>?
?? ??? ??? ?</dependency> -->
?
?? ??? ?<dependency>
?? ??? ??? ?<groupId>org.springframework.boot</groupId>
?? ??? ??? ?<artifactId>spring-boot-starter-tomcat</artifactId>
?? ??? ??? ?<scope>provided</scope>
?? ??? ?</dependency>
?? ??? ?<dependency>
?? ??? ??? ?<groupId>ch.qos.logback</groupId>
?? ??? ??? ?<artifactId>logback-classic</artifactId>
?? ??? ??? ?<version>1.2.3</version>
?? ??? ?</dependency>
?? ??? ?<dependency>
?? ??? ??? ?<groupId>org.apache.hadoop</groupId>
?? ??? ??? ?<artifactId>hadoop-client</artifactId>
?? ??? ??? ?<version>2.7.3</version>
?? ??? ??? ??
?? ??? ?</dependency>
?? ??? ?<dependency>
?? ??? ??? ?<groupId>org.apache.hadoop</groupId>
?? ??? ??? ?<artifactId>hadoop-common</artifactId>
?? ??? ??? ?<version>2.7.3</version>
?? ??? ??? ??
?? ??? ?</dependency>
?? ??? ?<dependency>
?? ??? ??? ?<groupId>org.apache.hadoop</groupId>
?? ??? ??? ?<artifactId>hadoop-hdfs</artifactId>
?? ??? ??? ?<version>2.7.3</version>
?? ??? ??? ??
?? ??? ?</dependency>
?? ??? ?<dependency>
?? ??? ??? ?<groupId>net.sourceforge.javacsv</groupId>
?? ??? ??? ?<artifactId>javacsv</artifactId>
?? ??? ??? ?<version>2.0</version>
?? ??? ?</dependency>
?? ??? ?<dependency>
?? ??? ??? ?<groupId>org.projectlombok</groupId>
?? ??? ??? ?<artifactId>lombok</artifactId>
?? ??? ?</dependency>
?
?? ??? ?<!-- 熱部署 <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-jpa</artifactId>?
?? ??? ??? ?</dependency> -->
?
?? ??? ?<dependency>
?? ??? ??? ?<groupId>org.springframework.boot</groupId>
?? ??? ??? ?<artifactId>spring-boot-starter-jdbc</artifactId>
?? ??? ??? ??
?? ??? ?</dependency>
?? ??? ?<dependency>
?? ??? ??? ?<groupId>org.springframework.boot</groupId>
?? ??? ??? ?<artifactId>spring-boot-starter-web</artifactId>
?? ??? ??? ??
?? ??? ?</dependency>
?? ??? ?<dependency>
?? ??? ??? ?<groupId>org.mybatis.spring.boot</groupId>
?? ??? ??? ?<artifactId>mybatis-spring-boot-starter</artifactId>
?? ??? ??? ?<version>1.3.2</version>
?? ??? ??? ??
?? ??? ?</dependency>
?? ?<dependency>
?? ??? ??? ?<groupId>org.springframework.boot</groupId>
?? ??? ??? ?<artifactId>spring-boot-devtools</artifactId>
?? ??? ??? ?<optional>true</optional>
?? ??? ??? ?<version>2.0.2.RELEASE</version>
?? ??? ?</dependency>
?? ??? ?<dependency>
?? ??? ??? ?<groupId>mysql</groupId>
?? ??? ??? ?<artifactId>mysql-connector-java</artifactId>
?? ??? ??? ?<scope>runtime</scope>
?? ??? ?</dependency>
?? ??? ?<dependency>
?? ??? ??? ?<groupId>org.springframework.boot</groupId>
?? ??? ??? ?<artifactId>spring-boot-starter-test</artifactId>
?? ??? ??? ?<scope>test</scope>
?
?? ??? ?</dependency>
?
?? ?</dependencies>
?
?? ?<build>
?? ??? ?<plugins>
?? ??? ??? ?<plugin>
?? ??? ??? ??? ?<groupId>org.springframework.boot</groupId>
?? ??? ??? ??? ?<artifactId>spring-boot-maven-plugin</artifactId>
?? ??? ??? ?</plugin>
?? ??? ?</plugins>
?? ?</build>
?
?
</project>
上面就是最重要的Pom.xml,也是我解決整合的關鍵,只要這個Pom.xml搭建出來了,問題就基本解決了。
我是用的是Spring boot和Mybatis再加上HDFS,關鍵的代碼主要我放下HDFS的操作,供給大家參考。
生產CSV過程我是在ServiceImpl中將數據封裝導入HDFS,所以你必須知道
1.如何在CSV中創建文件
2.如何將數據導入CSV文件
3.如何下載HDFS文件到本地。
所以先說說如何創建文件,以下是我的代碼:
?? ??? ?// 創建HDFS的文件夾包含創建csv文件,邏輯無誤!,已經修正
?? ?public String mkdir(String filename,String filepath) throws IOException {
?? ??? ?Configuration conf = new Configuration();
?? ??? ?conf.set(name, url);
?? ??? ?Path srcPath = new Path(filepath);
?? ??? ?FileSystem fs = srcPath.getFileSystem(conf);
?? ??? ?boolean ishere = fs.isDirectory(srcPath);
?? ??? ?if (ishere) {
?? ??? ??? ?System.out.println("文件夾已經存在!");
?? ??? ??? ?byte[] content = "".getBytes();
?? ??? ??? ?String path = filepath + "/" + filename + ".csv";
?? ??? ??? ?Path filePath = new Path(path);
?? ??? ??? ?FSDataOutputStream outputStream = fs.create(filePath);
?? ??? ??? ?outputStream.write(content);
?? ??? ??? ?outputStream = fs.create(filePath);
?? ??? ??? ?outputStream.close();
?? ??? ??? ?System.out.println("CSV文件創建成功!");
?? ??? ??? ?return path;
?? ??? ?} else {
?? ??? ??? ?boolean isok = fs.mkdirs(srcPath);
?? ??? ??? ?if (isok) {
?? ??? ??? ??? ?System.out.println("創建文件夾成功!");
?? ??? ??? ??? ?byte[] content = "".getBytes();
?? ??? ??? ??? ?conf.set(name, url);
?? ??? ??? ??? ?String path = filepath + "/" + filename + ".csv";
?? ??? ??? ??? ?Path filePath = new Path(path);
?? ??? ??? ??? ?FSDataOutputStream outputStream = fs.create(filePath);
?? ??? ??? ??? ?outputStream.write(content);
?? ??? ??? ??? ?outputStream = fs.create(filePath);
?? ??? ??? ??? ?outputStream.close();
?? ??? ??? ??? ?System.out.println("CSV文件創建成功!");
?? ??? ??? ??? ?return path;
?? ??? ??? ?} else {
?? ??? ??? ??? ?System.out.println("創建文件夾失敗!");
?? ??? ??? ??? ?return "500";
?? ??? ??? ?}
?
?
?? ??? ?}
?
?
?? ?}
以上是創建文件的一個過程
下面是如何將數據導入CSV中(不管你CSV在HDFS還是本地的window都這么操作,親測可行)
?? ?@Override
?? ?public String u_output(int userId, String initPath) {
?? ??? ?// TODO Auto-generated method stub
?? ??? ?HdfsFile hdfs = new HdfsFile();
?? ??? ?if (baseMapper.u_output(userId) != null) {
?? ??? ??? ?List<Ratings> list = new ArrayList<Ratings>();
?? ??? ??? ?list = baseMapper.u_output(userId);
?? ??? ??? ?for (Iterator iterator = list.iterator(); iterator.hasNext();) {
?? ??? ??? ??? ?Ratings ratings = (Ratings) iterator.next();
?? ??? ??? ??? ?ratings.setUserId(userId);
?? ??? ??? ?}
?? ??? ??? ?if (list.size() > 0) {
?? ??? ??? ??? ?try {
?? ??? ??? ??? ??? ?DateUntil date = new DateUntil();
?? ??? ??? ??? ??? ?String filename = date.getDate() + userId;
?? ??? ??? ??? ??? ?System.out.println("文件名字:" + filename);
?? ??? ??? ??? ??? ?String filepath = hdfs.mkdir(filename, initPath);
?? ??? ??? ??? ??? ?System.out.println("文件地址:" + filepath);
?? ??? ??? ??? ??? ?CsvWriter csvWriter = null;
?? ??? ??? ??? ??? ?if (filepath != "500" && filepath != "") {
?? ??? ??? ??? ??? ??? ?try {
?? ??? ??? ??? ??? ??? ??? ?csvWriter = new CsvWriter(filepath, ',', Charset.forName("UTF-8"));
?? ??? ??? ??? ??? ??? ??? ?String[] csvHeader = { "userId", "movieId" };
?? ??? ??? ??? ??? ??? ??? ?csvWriter.writeRecord(csvHeader);
?? ??? ??? ??? ??? ??? ??? ?for (int i = 0; i < list.size(); i++) {
?? ??? ??? ??? ??? ??? ??? ??? ?Ratings data = list.get(i);
?? ??? ??? ??? ??? ??? ??? ??? ?String uid = String.valueOf(data.getUserId());
?? ??? ??? ??? ??? ??? ??? ??? ?String mid = String.valueOf(data.getMovieId());
?? ??? ??? ??? ??? ??? ??? ??? ?String[] csvContent = { uid, mid };
?? ??? ??? ??? ??? ??? ??? ??? ?csvWriter.writeRecord(csvContent);
?? ??? ??? ??? ??? ??? ??? ?}
?? ??? ??? ??? ??? ??? ?} finally {
?? ??? ??? ??? ??? ??? ??? ?csvWriter.close();
?? ??? ??? ??? ??? ??? ??? ?System.out.println("--------CSV文件已經寫入--------");
?? ??? ??? ??? ??? ??? ??? ?String path = initPath + "/." + filename + ".csv.crc";
?? ??? ??? ??? ??? ??? ??? ?System.out.println("crc的文件路徑:" + path);
?? ??? ??? ??? ??? ??? ??? ?File fn = new File(path);
?? ??? ??? ??? ??? ??? ??? ?if (fn.exists()) {
?? ??? ??? ??? ??? ??? ??? ??? ?fn.delete();
?? ??? ??? ??? ??? ??? ??? ??? ?System.out.println("crc文件被刪除");
?? ??? ??? ??? ??? ??? ??? ?}
?? ??? ??? ??? ??? ??? ?}
?? ??? ??? ??? ??? ?}
?
?
?? ??? ??? ??? ?} catch (IOException e) {
?? ??? ??? ??? ??? ?// TODO Auto-generated catch block
?? ??? ??? ??? ??? ?e.printStackTrace();
?? ??? ??? ??? ?}
?? ??? ??? ?}
?? ??? ??? ?return "200";
?? ??? ?} else
?? ??? ??? ?return "500";
?? ?}
?? ?
以上是接口的實現方法,具體需要怎么改參數也就是userId和initPath是你們自己的需求。(可看Pom.xml導入了一個JavaCSV的依賴,就是這個可以幫我們快速地寫入CSV文件!)
最后的一個是下載CSV文件
代碼奉上:
// src是hdfs的,dstpath是本地
?? ?public void downloadFile(String dstPath, String srcPath) throws IOException {
?? ??? ?Path path = new Path(srcPath);
?? ??? ?Configuration conf = new Configuration();
?? ??? ?FileSystem hdfs;
?? ??? ?conf.set(name, url);
?? ??? ?hdfs = path.getFileSystem(conf);
?
?? ??? ?File rootfile = new File(dstPath);
?? ??? ?if (!rootfile.exists()) {
?? ??? ??? ?rootfile.mkdirs();
?? ??? ?}
?? ??? ?try {
?? ??? ??? ?if (hdfs.isFile(path)) {
?? ??? ??? ??? ?String fileName = path.getName();
?? ??? ??? ??? ?if (fileName.toLowerCase().endsWith("csv")) {
?? ??? ??? ??? ??? ?FSDataInputStream in = null;
?? ??? ??? ??? ??? ?FileOutputStream out = null;
?? ??? ??? ??? ??? ?try {
?? ??? ??? ??? ??? ??? ?in = hdfs.open(path);
?? ??? ??? ??? ??? ??? ?File srcfile = new File(rootfile, path.getName());
?? ??? ??? ??? ??? ??? ?if (!srcfile.exists())
?? ??? ??? ??? ??? ??? ??? ?srcfile.createNewFile();
?? ??? ??? ??? ??? ??? ?out = new FileOutputStream(srcfile);
?? ??? ??? ??? ??? ??? ?IOUtils.copyBytes(in, out, 4096, false);
?? ??? ??? ??? ??? ??? ?System.out.println("下載成功!");
?? ??? ??? ??? ??? ?} finally {
?? ??? ??? ??? ??? ??? ?IOUtils.closeStream(in);
?? ??? ??? ??? ??? ??? ?IOUtils.closeStream(out);
?? ??? ??? ??? ??? ?}
?? ??? ??? ??? ?} else if (hdfs.isDirectory(path)) {
?? ??? ??? ??? ??? ?File dstDir = new File(dstPath);
?? ??? ??? ??? ??? ?if (!dstDir.exists()) {
?? ??? ??? ??? ??? ??? ?dstDir.mkdirs();
?? ??? ??? ??? ??? ?}
?? ??? ??? ??? ??? ?// 在本地目錄上加一層子目錄
?? ??? ??? ??? ??? ?String filePath = path.toString();// 目錄
?? ??? ??? ??? ??? ?String subPath[] = filePath.split("/");
?? ??? ??? ??? ??? ?String newdstPath = dstPath + subPath[subPath.length - 1] + "/";
?? ??? ??? ??? ??? ?System.out.println("newdstPath=======" + newdstPath);
?? ??? ??? ??? ??? ?if (hdfs.exists(path) && hdfs.isDirectory(path)) {
?? ??? ??? ??? ??? ??? ?FileStatus[] srcFileStatus = hdfs.listStatus(path);
?? ??? ??? ??? ??? ??? ?if (srcFileStatus != null) {
?? ??? ??? ??? ??? ??? ??? ?for (FileStatus status : hdfs.listStatus(path)) {
?? ??? ??? ??? ??? ??? ??? ??? ?// 下載子目錄下文件
?? ??? ??? ??? ??? ??? ??? ??? ?downloadFile(newdstPath, status.getPath().toString());
?? ??? ??? ??? ??? ??? ??? ?}
?? ??? ??? ??? ??? ??? ?}
?? ??? ??? ??? ??? ?}
?
?? ??? ??? ??? ?}
?? ??? ??? ?}
?? ??? ?} catch (IOException e) {
?? ??? ??? ?// TODO Auto-generated catch block
?? ??? ??? ?e.printStackTrace();
?? ??? ?}
?? ?}
?
總結
以上是生活随笔為你收集整理的Spring boot项目整合Hadoop的HDFS的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: xgboost小试
- 下一篇: 一步一步理解GB、GBDT、xgboos