关于MapReduce中自定义Combine类(一)
生活随笔
收集整理的這篇文章主要介紹了
关于MapReduce中自定义Combine类(一)
小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.
MRJobConfig public static fina COMBINE_CLASS_ATTR 屬性COMBINE_CLASS_ATTR= "mapreduce.job.combine.class" ————子接口(F4) JobContent 方法getCombinerClass ————子實(shí)現(xiàn)類 JobContextImpl 實(shí)現(xiàn)getCombinerClass方法: public Class<? extends Reducer<?,?,?,?>> getCombinerClass() throws ClassNotFoundException { return (Class<? extends Reducer<?,?,?,?>>) conf.getClass(COMBINE_CLASS_ATTR, null); } 因?yàn)镴obContextImpl是MRJobConfig子類 所以得到了父類MRJobConfig的COMBINE_CLASS_ATTR屬性 ————子類Job public void setCombinerClass(Class<? extends Reducer> cls ) throws IllegalStateException { ensureState(JobState.DEFINE); conf.setClass(COMBINE_CLASS_ATTR, cls, Reducer.class); } 因?yàn)镴obContextImpl是MRJobConfig子類, 而Job是JobContextImpl的子類 所以也有COMBINE_CLASS_ATTR屬性 通過setCombinerClass設(shè)置了父類MRJobConfig的屬性 MRJobConfig ————子接口JobContent 方法getCombinerClass ————子實(shí)現(xiàn)類 JobContextImpl ————子類 Job ————子實(shí)現(xiàn)類 TaskAttemptContext 繼承了方法getCombinerClass Task??? $CombinerRunner(Task的內(nèi)部類)??? 該內(nèi)部類有方法create: public static <K,V> CombinerRunner<K,V> create(JobConf job, TaskAttemptID taskId, Counters.Counter inputCounter, TaskReporter reporter, org.apache.hadoop.mapreduce.OutputCommitter committer ) throws ClassNotFoundException { Class<? extends Reducer<K,V,K,V>> cls = (Class<? extends Reducer<K,V,K,V>>) job.getCombinerClass(); if (cls != null) { return new OldCombinerRunner(cls, job, inputCounter, reporter); } // make a task context so we can get the classes org.apache.hadoop.mapreduce.TaskAttemptContext taskContext = new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job, taskId, reporter); Class<? extends org.apache.hadoop.mapreduce.Reducer<K,V,K,V>> newcls = (Class<? extends org.apache.hadoop.mapreduce.Reducer<K,V,K,V>>) taskContext.getCombinerClass(); if (newcls != null) { return new NewCombinerRunner<K,V>(newcls, job, taskId, taskContext, inputCounter, reporter, committer); } return null; } 其中這一段應(yīng)該是舊的API Class<? extends Reducer<K,V,K,V>> cls = (Class<? extends Reducer<K,V,K,V>>) job.getCombinerClass(); if (cls != null) { return new OldCombinerRunner(cls, job, inputCounter, reporter); } 而這個(gè)是新的API org.apache.hadoop.mapreduce.TaskAttemptContext taskContext = new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job, taskId, reporter); Class<? extends org.apache.hadoop.mapreduce.Reducer<K,V,K,V>> newcls = (Class<? extends org.apache.hadoop.mapreduce.Reducer<K,V,K,V>>) taskContext.getCombinerClass(); if (newcls != null) { return new NewCombinerRunner<K,V>(newcls, job, taskId, taskContext, inputCounter, reporter, committer); } return null; (不知道為什么要寫全名,去掉那些包名、向上/下轉(zhuǎn)型和各種泛型的話,看起來就會(huì)清晰很多?) 而TaskAttemptContext是JobContent的子實(shí)現(xiàn)類,所以繼承了getCombinerClass方法 而且,這里用的是多態(tài),其調(diào)用的是子實(shí)現(xiàn)類TaskAttemptContextImpl的getCombinerClass方法 (TaskAttemptContextImpl繼承了JobContextImpl,而JobContextImpl實(shí)現(xiàn)了該方法) 所以最終get到了屬性COMBINE_CLASS_ATTR,即得到了我們通過job.setCombinerClass的xxxC 而這個(gè)xxxC是給了newcls,而newcls是給了NewCombinerRunner的構(gòu)造函數(shù)的reducerClassc參數(shù) NewCombinerRunner(Class reducerClass, JobConf job, org.apache.hadoop.mapreduce.TaskAttemptID taskId, org.apache.hadoop.mapreduce.TaskAttemptContext context, Counters.Counter inputCounter, TaskReporter reporter, org.apache.hadoop.mapreduce.OutputCommitter committer) { super(inputCounter, job, reporter); this.reducerClass = reducerClass; this.taskId = taskId; keyClass = (Class<K>) context.getMapOutputKeyClass(); valueClass = (Class<V>) context.getMapOutputValueClass(); comparator = (RawComparator<K>) context.getCombinerKeyGroupingComparator(); this.committer = committer; } Task MapTask $MapOutputBuffer private CombinerRunner<K,V> combinerRunner; $SpillThread類($表示內(nèi)部類) combinerRunner = CombinerRunner.create(job, getTaskID(), combineInputCounter, reporter, null); //此時(shí),我們得到了設(shè)置好的合并類???????????????????????????? if (combinerRunner == null) { // spill directly DataInputBuffer key = new DataInputBuffer(); while (spindex < mend && kvmeta.get(offsetFor(spindex % maxRec) + PARTITION) == i) { final int kvoff = offsetFor(spindex % maxRec); int keystart = kvmeta.get(kvoff + KEYSTART); int valstart = kvmeta.get(kvoff + VALSTART); key.reset(kvbuffer, keystart, valstart - keystart); getVBytesForOffset(kvoff, value); writer.append(key, value); ++spindex; } } else { int spstart = spindex; while (spindex < mend && kvmeta.get(offsetFor(spindex % maxRec) + PARTITION) == i) { ++spindex; } // Note: we would like to avoid the combiner if we've fewer // than some threshold of records for a partition if (spstart != spindex) { combineCollector.setWriter(writer); RawKeyValueIterator kvIter = new MRResultIterator(spstart, spindex); combinerRunner.combine(kvIter, combineCollector); } } 再查看combine函數(shù) 在Task的內(nèi)部類NewCombinerRunner下 public void combine(RawKeyValueIterator iterator, OutputCollector<K,V> collector) throws IOException, InterruptedException,ClassNotFoundException { // make a reducer org.apache.hadoop.mapreduce.Reducer<K,V,K,V> reducer = (org.apache.hadoop.mapreduce.Reducer<K,V,K,V>) ReflectionUtils.newInstance(reducerClass, job); org.apache.hadoop.mapreduce.Reducer.Context reducerContext = createReduceContext(reducer, job, taskId, iterator, null, inputCounter, new OutputConverter(collector), committer, reporter, comparator, keyClass, valueClass); reducer.run(reducerContext); } 上面的reducerClass就是我們傳入的xxxC 最終是通過反射創(chuàng)建了一個(gè)xxxC對(duì)象,并將其強(qiáng)制向上轉(zhuǎn)型為Reducer實(shí)例對(duì)象, 然后調(diào)用了向上轉(zhuǎn)型后對(duì)象的run方法(當(dāng)前的xxxC沒有run方法,調(diào)用的是父類Reduce的run) 在類Reducer中,run方法如下 /** * Advanced application writers can use the * {@link #run(org.apache.hadoop.mapreduce.Reducer.Context)} method to ?????????? * control how the reduce task works. */ public void run(Context context) throws IOException, InterruptedException { setup(context); try { while (context.nextKey()) { reduce(context.getCurrentKey(), context.getValues(), context); // If a back up store is used, reset it Iterator<VALUEIN> iter = context.getValues().iterator(); if(iter instanceof ReduceContext.ValueIterator) { ((ReduceContext.ValueIterator<VALUEIN>)iter).resetBackupStore();??????? } } } finally { cleanup(context); } } 有由于多態(tài),此時(shí)調(diào)用的reduce是子類xxxC中的reduce方法 (多態(tài)態(tài)性質(zhì):子類復(fù)寫了該方法,則實(shí)際上執(zhí)行的是子類中的該方法) 所以說,我們自定義combine用的類的時(shí)候,應(yīng)該繼承Reducer類,并且復(fù)寫reduce方法 且其輸入形式:(以wordcount為例) ? ? ? ?reduce(Text key, Iterable<IntWritable> values,?Context context) ? ? ? ?其中key是單詞個(gè)數(shù),而values是個(gè)數(shù)列表,也就是value1、value2........ ? ? ? ?注意,此時(shí)已經(jīng)是列表,即<鍵,list<值1、值2、值3.....>> ? ? ? ?(之所以得到這個(gè)結(jié)論,是因?yàn)槲耶?dāng)時(shí)使用的combine類是WCReduce, ? ? ? ? 即Reduce和combine所用的類是一樣的,通過對(duì)代碼的分析,傳入值的結(jié)構(gòu)如果是<lkey,value>的話,是不可能做到combine的啊——即所謂的對(duì)相同值合并,求計(jì)數(shù)的累積和,這根本就是兩個(gè)步驟,對(duì)key相同的鍵值對(duì)在map端就進(jìn)行了一次合并了,合并成了<key,value list>,然后才輪到combine接受直接換個(gè)形式的輸入,并處理——我們的處理是求和,然后再輸出到context,進(jìn)入reduce端的shuffle過程。 ? ? ? ? 然后我在reduce中遍歷了用syso輸出 ? ? ? ? 結(jié)果發(fā)現(xiàn)是0,而這實(shí)際上是因?yàn)榻?jīng)過一次遍歷,我的指針指向的位置就不對(duì)了啊, ? ? ? ??) 嗯,自己反復(fù)使用以下的代碼,不斷的組合、注釋,去測(cè)試吧~就會(huì)得出這樣的結(jié)論了 /reduce ??? publicstaticclassWCReduce extends Reducer<Text,IntWritable,Text,IntWritable>{ ??????? private final IntWritableValueOut=newIntWritable(); ??????? @Override ??????? protectedvoid reduce(Text key,Iterable<IntWritable> values, ??????????????? Context context)? throws IOException,InterruptedException{ ??????????? for(IntWritable value : values){ ??????????????? System.out.println(value.get()+"--"); ??????????? } ? //??????????? int total = 0 ; //??????????? for (IntWritable value : values) { //??????????????? total += value.get(); //??????????? } //??????????? ValueOut.set(total); //??????????? context.write(key, ValueOut); ??????? } ? ? ? } ? ? ? ? ?? job.setCombinerClass(WCReduce.class);
附件列表
?
轉(zhuǎn)載于:https://www.cnblogs.com/xuanlvshu/p/5744445.html
總結(jié)
以上是生活随笔為你收集整理的关于MapReduce中自定义Combine类(一)的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 一个简单的小程序demo
- 下一篇: 【1800题】一、函数、极限、连续