使用MapReduce将HDFS数据导入Mysql
生活随笔
收集整理的這篇文章主要介紹了
使用MapReduce将HDFS数据导入Mysql
小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.
使用MapReduce將Mysql數(shù)據(jù)導(dǎo)入HDFS代碼鏈接
將HDFS數(shù)據(jù)導(dǎo)入Mysql,代碼示例
package com.zhen.mysqlToHDFS;import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException;import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.lib.db.DBWritable; import org.apache.hadoop.mapreduce.Counter; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.db.DBConfiguration; import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner;/*** @author FengZhen* 將hdfs數(shù)據(jù)導(dǎo)入mysql* 使用DBOutputFormat將HDFS路徑下的結(jié)構(gòu)化數(shù)據(jù)寫入mysql中,結(jié)構(gòu)化數(shù)據(jù)如下,第一列為key,后邊三列為數(shù)據(jù)* 0 1 Enzo 180.66* 1 2 Din 170.666* */ public class DBOutputFormatApp extends Configured implements Tool{/*** JavaBean* 需要實(shí)現(xiàn)Hadoop序列化接口Writable以及與數(shù)據(jù)庫(kù)交互時(shí)的序列化接口DBWritable* 官方API中解釋如下:* public class DBInputFormat<T extends DBWritable>* extends InputFormat<LongWritable, T> implements Configurable* 即Mapper的Key是LongWritable類型,不可改變;Value是繼承自DBWritable接口的自定義JavaBean*/public static class BeanWritable implements Writable, DBWritable {private int id;private String name;private double height;public void readFields(ResultSet resultSet) throws SQLException {this.id = resultSet.getInt(1);this.name = resultSet.getString(2);this.height = resultSet.getDouble(3);}public void write(PreparedStatement preparedStatement) throws SQLException {preparedStatement.setInt(1, id);preparedStatement.setString(2, name);preparedStatement.setDouble(3, height);}public void readFields(DataInput dataInput) throws IOException {this.id = dataInput.readInt();this.name = dataInput.readUTF();this.height = dataInput.readDouble();}public void write(DataOutput dataOutput) throws IOException {dataOutput.writeInt(id);dataOutput.writeUTF(name);dataOutput.writeDouble(height);}public void set(int id,String name,double height){this.id = id;this.name = name;this.height = height;}@Overridepublic String toString() {return id + "\t" + name + "\t" + height;}}public static class DBOutputMapper extends Mapper<LongWritable, Text, NullWritable, BeanWritable>{private NullWritable outputKey;private BeanWritable outputValue;@Overrideprotected void setup(Mapper<LongWritable, Text, NullWritable, BeanWritable>.Context context)throws IOException, InterruptedException {this.outputKey = NullWritable.get();this.outputValue = new BeanWritable();}@Overrideprotected void map(LongWritable key, Text value,Mapper<LongWritable, Text, NullWritable, BeanWritable>.Context context)throws IOException, InterruptedException {//插入數(shù)據(jù)庫(kù)成功的計(jì)數(shù)器final Counter successCounter = context.getCounter("exec", "successfully");//插入數(shù)據(jù)庫(kù)失敗的計(jì)數(shù)器final Counter faildCounter = context.getCounter("exec", "faild");//解析結(jié)構(gòu)化數(shù)據(jù)String[] fields = value.toString().split("\t");//DBOutputFormatApp這個(gè)MapReduce應(yīng)用導(dǎo)出的數(shù)據(jù)包含long類型的key,所以忽略key從1開始if (fields.length > 3) {int id = Integer.parseInt(fields[1]);String name = fields[2];double height = Double.parseDouble(fields[3]);this.outputValue.set(id, name, height);context.write(outputKey, outputValue);//如果插入數(shù)據(jù)庫(kù)成功則遞增1,表示成功計(jì)數(shù)successCounter.increment(1L);}else{//如果插入數(shù)據(jù)庫(kù)失敗則遞增1,表示失敗計(jì)數(shù)faildCounter.increment(1L);}}}/*** 輸出的key必須是繼承自DBWritable的類型,DBOutputFormat要求輸出的key必須是DBWritable類型* */public static class DBOutputReducer extends Reducer<NullWritable, BeanWritable, BeanWritable, NullWritable>{@Overrideprotected void reduce(NullWritable key, Iterable<BeanWritable> values,Reducer<NullWritable, BeanWritable, BeanWritable, NullWritable>.Context context)throws IOException, InterruptedException {for (BeanWritable beanWritable : values) {context.write(beanWritable, key);}}}public int run(String[] arg0) throws Exception {Configuration configuration = getConf();//在創(chuàng)建Configuration的時(shí)候緊接著配置數(shù)據(jù)庫(kù)連接信息DBConfiguration.configureDB(configuration, "com.mysql.jdbc.Driver", "jdbc:mysql://localhost:3306/hadoop", "root", "123qwe");Job job = Job.getInstance(configuration, DBOutputFormatApp.class.getSimpleName());job.setJarByClass(DBOutputFormatApp.class);job.setMapperClass(DBOutputMapper.class);job.setMapOutputKeyClass(NullWritable.class);job.setMapOutputValueClass(BeanWritable.class);job.setReducerClass(DBOutputReducer.class);job.setOutputFormatClass(DBOutputFormat.class);job.setOutputKeyClass(BeanWritable.class);job.setOutputValueClass(NullWritable.class);job.setInputFormatClass(TextInputFormat.class);FileInputFormat.setInputPaths(job, arg0[0]);//配置當(dāng)前作業(yè)輸出到數(shù)據(jù)庫(kù)表、字段信息DBOutputFormat.setOutput(job, "people", new String[]{"id","name","height"});return job.waitForCompletion(true)?0:1;}public static int createJob(String[] args){Configuration conf = new Configuration();conf.set("dfs.datanode.socket.write.timeout", "7200000");conf.set("mapreduce.input.fileinputformat.split.minsize", "268435456");conf.set("mapreduce.input.fileinputformat.split.maxsize", "536870912");int status = 0;try {status = ToolRunner.run(conf,new DBOutputFormatApp(), args);} catch (Exception e) {e.printStackTrace();}return status;}public static void main(String[] args) {args = new String[]{"/user/hadoop/mapreduce/mysqlToHdfs/people"};int status = createJob(args);System.exit(status);}}打成jar包,放在服務(wù)器上,執(zhí)行hadoop jar命令
hadoop jar /Users/FengZhen/Desktop/Hadoop/other/mapreduce_jar/HDFSToMysql.jar com.zhen.mysqlToHDFS.DBOutputFormatApp任務(wù)結(jié)束后mysql表中即可發(fā)現(xiàn)數(shù)據(jù)已經(jīng)有了。
轉(zhuǎn)載于:https://www.cnblogs.com/EnzoDin/p/8429992.html
總結(jié)
以上是生活随笔為你收集整理的使用MapReduce将HDFS数据导入Mysql的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 宝塔 windows 2012 配置 f
- 下一篇: iOS扩大按钮的点击范围