生活随笔
收集整理的這篇文章主要介紹了
如何使用Hadoop的JobControl
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
轉自:http://qindongliang.iteye.com/blog/2064281
如果MapReduce中需要用到多個job,而且多個job之間需要設置一些依賴關系,比如Job3需要依賴于Job1和Job2,這就要用到JobControl,具體的用法如下:
?public?static?int?handleJobChain(Job?job1?,Job?job2,Job job3, String?chainName)?throws?IOException{?? ????????ControlledJob?controlledJob1?=?new?ControlledJob(job1.getConfiguration());?? ????????controlledJob1.setJob(job1);?? ?????????? ????????ControlledJob?controlledJob2?=?new?ControlledJob(job2.getConfiguration());?? ????????controlledJob2.setJob(job2);?? ? ? ? ? ? ? ? ??ControlledJob?controlledJob3?=?new?ControlledJob(job2.getConfiguration());? ? ? ? ??controlledJob3.setJob(job3); ????????controlledJob3.addDependingJob(controlledJob1); ? ? ? ? controlledJob3.addDependingJob(controlledJob2); ??? ?????????? ????????JobControl?jc?=?new?JobControl(chainName);?? ????????jc.addJob(controlledJob1);?? ????????jc.addJob(controlledJob2);? ? ? ? ? jc.addJob(controlledJob2);? ? ????????Thread?jcThread?=?new?Thread(jc);?? ????????jcThread.start();?? ????????while(true){?? ????????????if(jc.allFinished()){?? ????????????????System.out.println(jc.getSuccessfulJobList());?? ????????????????jc.stop();?? ????????????????return?0;?? ????????????}?? ????????????if(jc.getFailedJobList().size()?>?0){?? ????????????????System.out.println(jc.getFailedJobList());?? ????????????????jc.stop();?? ????????????????return?1;?? ????????????}?? ????????}?? ????} ?
需要給每個Job設置自己的Configuration,然后通過JobControl將多個Job連接到一起。?
由于JobControl實現了Runnable接口,可以通過線程運行JobControl,最后通過Stop方法可以停止。如果不用一個Thread來運行,就會導致Hadoop中所有Job執行完畢之后,最后不會退出,但是結果是輸出完畢的。
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------
使用Hadoop里面的MapReduce來處理海量數據是非常簡單方便的,但有時候我們的應用程序,往往需要多個MR作業,來計算結果,比如說一個最簡單的使用MR提取海量搜索日志的TopN的問題,注意,這里面,其實涉及了兩個MR作業,第一個是詞頻統計,第兩個是排序求TopN,這顯然是需要兩個MapReduce作業來完成的。其他的還有,比如一些數據挖掘類的作業,常常需要迭代組合好幾個作業才能完成,這類作業類似于DAG類的任務,各個作業之間是具有先后,或相互依賴的關系,比如說,這一個作業的輸入,依賴上一個作業的輸出等等。?
在Hadoop里實際上提供了,JobControl類,來組合一個具有依賴關系的作業,在新版的API里,又新增了ControlledJob類,細化了任務的分配,通過這兩個類,我們就可以輕松的完成類似DAG作業的模式,這樣我們就可以通過一個提交來完成原來需要提交2次的任務,大大簡化了任務的繁瑣度。具有依賴式的作業提交后,hadoop會根據依賴的關系,先后執行的job任務,每個任務的運行都是獨立的。?
下面來看下散仙的例子,組合一個詞頻統計+排序的作業,測試數據如下:?
Java代碼??
秦東亮;72?? 秦東亮;34?? 秦東亮;100?? 三劫;899?? 三劫;32?? 三劫;1?? a;45?? b;567?? b;12??
代碼如下:
?
Java代碼??
package?com.qin.test.hadoop.jobctrol;?? ?? import?java.io.IOException;?? ?? import?org.apache.hadoop.fs.FileSystem;?? import?org.apache.hadoop.fs.Path;?? import?org.apache.hadoop.io.IntWritable;?? import?org.apache.hadoop.io.LongWritable;?? import?org.apache.hadoop.io.Text;?? import?org.apache.hadoop.io.WritableComparator;?? import?org.apache.hadoop.mapred.JobConf;?? import?org.apache.hadoop.mapreduce.Job;?? import?org.apache.hadoop.mapreduce.Mapper;?? import?org.apache.hadoop.mapreduce.Reducer;?? import?org.apache.hadoop.mapreduce.lib.input.FileInputFormat;?? import?org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob;?? import?org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl;?? import?org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;?? ?? ?? ?? ?? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?? public?class?MyHadoopControl?{?? ?????? ?????? ?????? ????? ? ? ? ? ? ? ? ? ?? ????private?static?class?SumMapper?extends?Mapper<LongWritable,?Text,?Text,?IntWritable>{?? ?????????? ????????private?Text?t=new?Text();?? ????????private?IntWritable?one=new?IntWritable(1);?? ?????????? ????????? ? ? ? ? ?? ????????@Override?? ????????protected?void?map(LongWritable?key,?Text?value,Context?context)?? ????????????????throws?IOException,?InterruptedException?{?? ????????????String?data=value.toString();?? ????????????String?words[]=data.split(";");?? ??????????if(words[0].trim()!=null){?? ??????????????t.set("?"+words[0]);?? ??????????????one.set(Integer.parseInt(words[1]));?? ??????????????context.write(t,?one);?? ??????????}??? ????????}?? ?????????? ????}?? ?????? ????? ? ? ? ?? ????private?static?class?SumReduce?extends?Reducer<Text,?IntWritable,?IntWritable,?Text>{?? ?????????? ?????????? ????????private?IntWritable?iw=new?IntWritable();?? ?????????? ????????@Override?? ????????protected?void?reduce(Text?key,?Iterable<IntWritable>?value,Context?context)?? ????????????????throws?IOException,?InterruptedException?{?? ??????????? ?????????????? ????????????int?sum=0;?? ????????????for(IntWritable?count:value){?? ????????????????sum+=count.get();?? ????????????}?? ????????????iw.set(sum);?? ????????????context.write(iw,?key);?? ?????????????? ?????????????? ?????????????? ?????????????? ?????????????? ????????}?? ?????????? ????}?? ?????? ?????? ????? ? ? ?? ????private?static?class?SortMapper??extends?Mapper<LongWritable,?Text,?IntWritable,?Text>{?? ?????????? ?????????? ????????IntWritable?iw=new?IntWritable();?? ????????private?Text?t=new?Text();?? ????????@Override?? ????????protected?void?map(LongWritable?key,?Text?value,Context?context)throws?IOException,?InterruptedException?{?? ?????????????? ????????????String?words[]=value.toString().split("?");?? ???????????System.out.println("數組的長度:?"+words.length);?? ????????????System.out.println("Map讀入的文本:?"+value.toString());?? ????????????System.out.println("=====>??"+words[0]+"??=====>"+words[1]);?? ?????????????if(words[0]!=null){?? ?????????????????iw.set(Integer.parseInt(words[0].trim()));?? ?????????????????t.set(words[1].trim());?? ?????????????????context.write(iw,?t);?? ?????????????}?? ?????????????? ??????????????? ?????????????? ????????}?? ?????????? ?????????? ?????????? ????}?? ?????? ?????? ????? ? ? ?? ????private?static?class?SortReduce?extends?Reducer<IntWritable,?Text,?Text,?IntWritable>{?? ?????????? ?????????? ?????????? ????????? ? ? ? ?? ????????@Override?? ????????protected?void?reduce(IntWritable?key,?Iterable<Text>?value,Context?context)?? ????????????????throws?IOException,?InterruptedException?{?? ??????????? ?????????????for(Text?t:value){?? ?????????????????context.write(t,?key);?? ?????????????}?? ?????????????? ????????}?? ?????????? ????}?? ?????? ?????? ?????? ?????? ????? ? ? ? ?? ????????public?static?class?DescSort?extends??WritableComparator{?? ?? ?????????????public?DescSort()?{?? ?????????????????super(IntWritable.class,true);?? ????????????}?? ?????????????@Override?? ????????????public?int?compare(byte[]?arg0,?int?arg1,?int?arg2,?byte[]?arg3,?? ????????????????????int?arg4,?int?arg5)?{?? ????????????????return?-super.compare(arg0,?arg1,?arg2,?arg3,?arg4,?arg5);?? ????????????}?? ??????????????? ?????????????@Override?? ????????????public?int?compare(Object?a,?Object?b)?{?? ??????????? ????????????????return???-super.compare(a,?b);?? ????????????}?? ?????????????? ????????}?? ?????? ?????? ?????? ?????? ?????? ?????? ????? ? ? ?? ????public?static?void?main(String[]?args)throws?Exception?{?? ?????? ?????????? ???????????JobConf?conf=new?JobConf(MyHadoopControl.class);??? ???????????conf.set("mapred.job.tracker","192.168.75.130:9001");?? ???????????conf.setJar("tt.jar");?? ??????????? ?????????System.out.println("模式:??"+conf.get("mapred.job.tracker"));;?? ?????????? ?????????? ????????? ? ? ? ? ?? ????????Job?job1=new?Job(conf,"Join1");?? ????????job1.setJarByClass(MyHadoopControl.class);?? ?????????? ????????job1.setMapperClass(SumMapper.class);?? ????????job1.setReducerClass(SumReduce.class);?? ?????????? ????????job1.setMapOutputKeyClass(Text.class);?? ????????job1.setMapOutputValueClass(IntWritable.class);?? ?????????? ????????job1.setOutputKeyClass(IntWritable.class);?? ????????job1.setOutputValueClass(Text.class);?? ?????????? ?????? ?????????? ?????????? ????????ControlledJob?ctrljob1=new??ControlledJob(conf);?? ????????ctrljob1.setJob(job1);?? ?????????? ?????????? ????????FileInputFormat.addInputPath(job1,?new?Path("hdfs://192.168.75.130:9000/root/input"));?? ????????FileSystem?fs=FileSystem.get(conf);?? ??????????? ?????????Path?op=new?Path("hdfs://192.168.75.130:9000/root/op");?? ??????????? ?????????if(fs.exists(op)){?? ?????????????fs.delete(op,?true);?? ?????????????System.out.println("存在此輸出路徑,已刪除!!!");?? ?????????}?? ????????FileOutputFormat.setOutputPath(job1,?op);?? ?????????? ?????? ?????????? ????????? ? ? ? ? ?? ????????Job?job2=new?Job(conf,"Join2");?? ????????job2.setJarByClass(MyHadoopControl.class);?? ?????????? ?????????? ?????????? ?????????? ????????job2.setMapperClass(SortMapper.class);?? ????????job2.setReducerClass(SortReduce.class);?? ?????????? ????????job2.setSortComparatorClass(DescSort.class);?? ?????????? ????????job2.setMapOutputKeyClass(IntWritable.class);?? ????????job2.setMapOutputValueClass(Text.class);?? ?????????? ????????job2.setOutputKeyClass(Text.class);?? ????????job2.setOutputValueClass(IntWritable.class);?? ?????????? ?????????? ?? ?????????? ????????ControlledJob?ctrljob2=new?ControlledJob(conf);?? ????????ctrljob2.setJob(job2);?? ?????????? ????????? ? ? ? ? ? ?? ????????ctrljob2.addDependingJob(ctrljob1);?? ?????????? ?????????? ?????????? ?????????? ????????FileInputFormat.addInputPath(job2,?new?Path("hdfs://192.168.75.130:9000/root/op/part*"));?? ????????FileSystem?fs2=FileSystem.get(conf);?? ??????????? ?????????Path?op2=new?Path("hdfs://192.168.75.130:9000/root/op2");?? ?????????if(fs2.exists(op2)){?? ?????????????fs2.delete(op2,?true);?? ?????????????System.out.println("存在此輸出路徑,已刪除!!!");?? ?????????}?? ????????FileOutputFormat.setOutputPath(job2,?op2);?? ?????????? ?????????? ?????????? ?????????? ?????????? ?????????? ?????????? ?????????? ?????????? ?????????? ?????????? ?????????? ????????? ? ? ? ?? ????????JobControl?jobCtrl=new?JobControl("myctrl");?? ?????????? ?????????? ?????????? ????????jobCtrl.addJob(ctrljob1);??? ????????jobCtrl.addJob(ctrljob2);?? ?????????? ??? ?????????? ????????Thread??t=new?Thread(jobCtrl);?? ????????t.start();?? ?????????? ????????while(true){?? ?????????????? ????????????if(jobCtrl.allFinished()){?? ????????????????System.out.println(jobCtrl.getSuccessfulJobList());?? ?????????????????? ????????????????jobCtrl.stop();?? ????????????????break;?? ????????????}?? ?????????????? ????????????if(jobCtrl.getFailedJobList().size()>0){?? ????????????????System.out.println(jobCtrl.getFailedJobList());?? ?????????????????? ????????????????jobCtrl.stop();?? ????????????????break;?? ????????????}?? ?????????????? ????????}?? ?????????? ?????????? ?????????? ?????????? ??????? ?????????? ?????????? ?????????? ?????????? ?????????? ?????????? ?????????? ?????????? ?????????? ?????????? ?????????? ?????????? ?????????? ?????????? ??? ?????????? ?????????? ????}?? ?????? ?????? ?????? ?????? ?? }??
運行日志如下:
?
Java代碼??
模式:??192.168.75.130:9001?? 存在此輸出路徑,已刪除!!!?? 存在此輸出路徑,已刪除!!!?? WARN?-?JobClient.copyAndConfigureFiles(746)?|?Use?GenericOptionsParser?for?parsing?the?arguments.?Applications?should?implement?Tool?for?the?same.?? INFO?-?FileInputFormat.listStatus(237)?|?Total?input?paths?to?process?:?1?? WARN?-?NativeCodeLoader.<clinit>(52)?|?Unable?to?load?native-hadoop?library?for?your?platform...?using?builtin-java?classes?where?applicable?? WARN?-?LoadSnappy.<clinit>(46)?|?Snappy?native?library?not?loaded?? WARN?-?JobClient.copyAndConfigureFiles(746)?|?Use?GenericOptionsParser?for?parsing?the?arguments.?Applications?should?implement?Tool?for?the?same.?? INFO?-?FileInputFormat.listStatus(237)?|?Total?input?paths?to?process?:?1?? [job?name:??Join1?? job?id:?myctrl0?? job?state:??SUCCESS?? job?mapred?id:??job_201405092039_0001?? job?message:????just?initialized?? job?has?no?depending?job:????? ,?job?name:?Join2?? job?id:?myctrl1?? job?state:??SUCCESS?? job?mapred?id:??job_201405092039_0002?? job?message:????just?initialized?? job?has?1?dependeng?jobs:?? ?????depending?job?0:???Join1?? ]??
處理的結果如下:
?
Java代碼??
三劫??932?? b???579?? 秦東亮?206?? a???45??
可以看出,結果是正確的。程序運行成功,上面只是散仙測的2個MapReduce作業的組合,更多的組合其實和上面的一樣。?
總結:在配置多個作業時,Job的配置盡量分離單獨寫,不要輕易拷貝修改,這樣很容易出錯的,散仙在配置的時候,就是拷貝了一個,結果因為少修改了一個地方,在運行時候一直報錯,最后才發現,原來少改了某個地方,這一點需要注意一下。
《新程序員》:云原生和全面數字化實踐50位技術專家共同創作,文字、視頻、音頻交互閱讀
總結
以上是生活随笔為你收集整理的如何使用Hadoop的JobControl的全部內容,希望文章能夠幫你解決所遇到的問題。
如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。