mapreduce 讀hbase數據 寫入hdfs
java代碼如下
import com.google.common.collect.Lists;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.PrintStream;
import java.util.List;
import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.log4j.Logger;public class HbaseToHdfs
{private static final Logger logger
= Logger.getLogger(HbaseToHdfs03.class);public static Configuration conf
= HBaseConfiguration.create();static {conf
.set("hbase.master", "xxx:xxx");conf
.set("mapreduce.output.fileoutputformat.compress", "false");conf
.addResource(new Path("xxx/core-site.xml"));conf
.addResource(new Path("xxx/hdfs-site.xml"));conf
.addResource(new Path("xxx/hbase-site.xml"));conf
.set("hbase.client.pause","2000");conf
.set("hbase.client.retries.number","100");conf
.set("hbase.client.operation.timeout","500000");}public static void main(String[] args
)throws Exception{InputStream foin
= new FileInputStream(args
[2]);Properties prop
= new Properties();prop
.load(foin
);foin
.close();String cloumns
= prop
.getProperty(args
[0]).trim();conf
.set("cloumns", cloumns
);Job job
= Job.getInstance(conf
, "xxx");job
.setJarByClass(HbaseToHdfs.class);job
.setMapperClass(HbaseToHdfs.MyMapper.class);job
.setNumReduceTasks(0);TableMapReduceUtil.initTableMapperJob(initScans(job
, args
[0]), HbaseToHdfs03.MyMapper.class,NullWritable.class, Text.class, job
);FileSystem fs
= FileSystem.get(conf
);if (fs
.exists(new Path(args
[1]))) {fs
.delete(new Path(args
[1]));}FileOutputFormat.setOutputPath(job
, new Path(args
[1]));long start
= System.currentTimeMillis();try{job
.waitForCompletion(true);}finally{fs
.setPermission(new Path(args
[1]), new FsPermission("777"));FileStatus[] files
= fs
.listStatus(new Path(args
[1]));for (FileStatus fileStatus
: files
){Path p
= fileStatus
.getPath();fs
.setPermission(p
, new FsPermission("777"));}fs
.close();long end
= System.currentTimeMillis();logger
.info("Job<" + job
.getJobName() + ">是否執行成功:" + job
.isSuccessful() + ";開始時間:" + start
+ "; 結束時間:" + end
+ "; 用時:" + (end
- start
) + "ms");}}private static List<Scan> initScans(Job job
, String tableName
){Configuration conf
= job
.getConfiguration();Scan scan
= new Scan();scan
.setAttribute("scan.attributes.table.name", Bytes.toBytes(tableName
));return Lists.newArrayList(new Scan[] { scan
});}public static class MyMapperextends TableMapper<NullWritable, Text>{String cloumns
= "";protected void map(ImmutableBytesWritable key
, Result r
, Mapper<ImmutableBytesWritable, Result, NullWritable, Text>.Context context
)throws IOException, InterruptedException{if (r
!= null){String all
= "";int j
= 0;for (String cloumn
: this.cloumns
.split(",")){j
++;String s
= "";try{byte[] p
= r
.getValue("xxx".getBytes(), cloumn
.getBytes());if (p
!= null){s
= new String(p
, "UTF-8");s
= s
.replaceAll("\\n", "").replaceAll("\\r", "");s
= s
.replaceAll(",", ".");s
= s
.replaceAll(";", ".");if ("NULL".equals(s
)) {s
= "";}}}catch (Exception e
){System.out
.println("111");s
= "";}if (j
== 1) {all
= s
;} else {all
= all
+ "," + s
;}}context
.write(NullWritable.get(), new Text(all
));}}protected void setup(Mapper<ImmutableBytesWritable, Result, NullWritable, Text>.Context context
)throws IOException, InterruptedException{Configuration conf
= context
.getConfiguration();this.cloumns
= conf
.get("cloumns");}}
}
使用方法如下:
傳入三個參數
第一個為hbase的表名
第二個為hdfs的寫入路徑
第三個為一個配置文件
里面寫的格式為:
hbase表名=準備寫到hive的字段列表,以逗號分開
maven依賴如下:
<dependencies><dependency><groupId>org.apache.parquet
</groupId><artifactId>parquet-hadoop
</artifactId><version>1.10.0
</version></dependency><dependency><groupId>org.apache.hadoop
</groupId><artifactId>hadoop-hdfs
</artifactId><version>2.6.0
</version></dependency><dependency><groupId>org.apache.hadoop
</groupId><artifactId>hadoop-common
</artifactId><version>2.6.0
</version></dependency><dependency><groupId>org.apache.hadoop
</groupId><artifactId>hadoop-mapreduce-client-core
</artifactId><version>2.6.0
</version></dependency><dependency><groupId>org.apache.hbase
</groupId><artifactId>hbase-client
</artifactId><version>1.0.0-cdh5.5.0
</version></dependency><dependency><groupId>org.apache.hbase
</groupId><artifactId>hbase-common
</artifactId><version>1.0.0-cdh5.5.0
</version></dependency><dependency><groupId>org.apache.hbase
</groupId><artifactId>hbase
</artifactId><version>1.0.0-cdh5.5.0
</version><type>pom
</type></dependency><dependency><groupId>org.apache.hbase
</groupId><artifactId>hbase-protocol
</artifactId><version>1.0.0-cdh5.5.0
</version></dependency><dependency><groupId>org.apache.hbase
</groupId><artifactId>hbase-server
</artifactId><version>1.0.0-cdh5.5.0
</version></dependency></dependencies>
總結
以上是生活随笔為你收集整理的java mapreduce 读hbase数据 写入hdfs 含maven依赖的全部內容,希望文章能夠幫你解決所遇到的問題。
如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。