MapReduce入门
?
?
?
說明
- MapReduce是一種分布式計算模型,解決海量數據的計算問題,主要有Map和Reduce組成
- 用戶使用時需要實現map()和reduce()兩個函數,兩個函數的形參都是key/value鍵值對
- 若以eclipse為開發環境,運行時出現內存不足的情況,需要修改虛擬機的參數 (例如把Default VM Arguments修改為 –Xms64m?? -Xmx128m)
?
?
?
?
MapReduce原理
-
如果block的大小默認是64MB,假設輸入文件有兩個,一個32MB,一個72MB,則小的文件時一個輸入片,大文件會分為兩個數據塊,是兩個輸入片,一共三個輸入片每一個輸入片由一個Mapper進程處理,所以一共三個Mapper進程處理
?
?
?
MapReduce執行流程
?
運行時通過Mapper讀取HDFS文件,執行自己的方法,最后輸出到HDFS文件中
hadoop中,map函數位于內置類org.apache.hadoop.mapreduce.Mapper<KEYIN,VALUEIN,KEYOUT,VALUEOUT>中(Mapper.java?? 122行)
reduce函數位于內置類org.apache.hadoop.mapreduce.Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT>中(Reducer.java?? 153行)
?
?
?
JobTracker和TaskTracker
- JobTracker:負責接收用戶提交的作業,負責啟動、跟蹤任務執行(JobClient是用戶作業與JobTracker交互的主要接口)
- TaskTracker:負責任務的執行
?
?
?
Mapper 和 Reducer
- 每個Mapper任務對應一個java進程,它會讀取HDFS文件,解析成許多鍵值對,經過我們重寫的map方法處理后,轉換為很多鍵值對再輸出
?具體分為下面6個階段:
?
- reducer執行過程: ?每個Reducer任務是一個java進程,接收Mapper任務的輸出,歸約后寫入HDFS中
過程如下:
?
?
?
接口
- 默認設置
?
- writable接口
1.?MapReduce的任意Key和Value必須實現Writable接口.
2. 兩個方法 write和readFileds方法
write方法將對象序列化到DataOutput中
readFields從DataInput中將對象反序列化到對象的屬性中
3.常用的writable實現類:(其中Text類似于java.lang.String)
?
?
- InputFormat ?輸入文件格式化類
- FileInputFormat是InputFormat的子類,是所有以文件作為數據源的InputFormat實現的基類
- FileInputFormat只劃分比HDFS block大的文件,如果一個文件比block小將不會被劃分,每一個小文件會被當做一個split并分配一個map任務
?
- TextInputFormat
- 繼承自FileInputFormat
- 默認的處理類,處理普通文本文件
- 文件中每一行作為一個記錄(將每一行在文件中的起始偏移量作為key,每一行的內容作為value。)
- 默認以\n或回車鍵作為一行記錄
?
- OutputFormat 輸出文件格式化
- ? TextOutputformat
?????? 默認的輸出格式,key和value中間值用tab隔開的。
- ? SequenceFileOutputformat
?????? 將key和value以sequencefile格式輸出。
- ? ? SequenceFileAsOutputFormat
?????? 將key和value以原始二進制的格式輸出。
- ? ? MapFileOutputFormat
?????? 將key和value寫入MapFile中。由于MapFile中的key是有序的,所以寫入的時候必須保證記錄是按key值順序寫入的。
- ? ?MultipleOutputFormat
?????? 默認情況下一個reducer會產生一個輸出,但是有些時候我們想一個reducer產生多個輸出,MultipleOutputFormat和MultipleOutputs可以實現這個功能。
?
?
?
簡單例子:wordcount
?
- 重寫map方法:
?
- 重寫reduce方法
- public static class MyReducer extends Reducer<Text,IntWritable,Text,IntWritable >{final IntWritable value3=new IntWritable(0);//key3,表示單詞出現的總次數//key表示單詞,values表示map方法輸出的1的集合,context為上下文對象protected void reduce(Text key,java.lang.Iterable<IntWritable> values,Context context)throws java.io.IOException,InterruptedException{int sum=0;for(IntWritable count:values){sum+=count.get();}//執行到這里,sum表示該單詞出現的總次數//key3表示單詞,是最后輸出的keyfinal Text key3=key;//value3表示單詞出現的總次數,是最后出現的value
value3.set(sum);context.write(key3,value3);}}
?
- 調用:
- public static void main(String[] args) throws IOException,InterruptedException,ClassNotFoundException{// TODO Auto-generated method stub final String INPUT_PATH="hdfs://hadoop:9000/input";//輸入路徑final String OUTPUT_PATH="hdfs://hadoop:9000/output";//輸出路徑,必須是不存在的Configuration conf = new Configuration();//加載配置文件final Job job = new Job(conf,"WordCountApp");//創建一個job對象,封裝運行時所需要的所有信息,可以提交到hadoop獨立地運行job.setJarByClass(WordCountApp.class);//需要打X成jar包的話,加這一句FileInputFormat.setInputPaths(job,INPUT_PATH);//告訴job執行作業時輸入文件的路徑job.setInputFormatClass(TextInputFormat.class);//設置把輸入文件處理成鍵值對的類 job.setMapperClass(MyMapper.class);//設置自定義的Mapper類job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(IntWritable.class);//設置map方法輸出的k2、v2的類型//job.setCombinerClass(MyReducer.class);job.setPartitionerClass(HashPartitioner.class);//設置對k2分區的類job.setNumReduceTasks(1);//設置運行的Reducer任務的數量job.setReducerClass(MyReducer.class);//設置自定義的Reducer類 FileOutputFormat.setOutputPath(job,new Path(OUTPUT_PATH));//告訴job執行作業的輸出路徑job.setOutputFormatClass(TextOutputFormat.class); job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);//設置reduce方法輸出的k3、v3的類型 job.waitForCompletion(true);//讓作業運行,直到運行結束,程序退出,把job對象提交給hadoop運行,直到作業運行結束后才可以}
?
- input文件內容如下:
?
- 運行日志如下:
?
14/05/18 12:08:04 INFO mapred.JobClient: Running job: job_local_0001
14/05/18 12:08:04 INFO mapred.Task: Using ResourceCalculatorPlugin : null
14/05/18 12:08:04 INFO mapred.MapTask: io.sort.mb = 100
14/05/18 12:08:04 INFO mapred.MapTask: data buffer = 79691776/99614720
14/05/18 12:08:04 INFO mapred.MapTask: record buffer = 262144/327680
14/05/18 12:08:04 INFO mapred.MapTask: Starting flush of map output
14/05/18 12:08:04 INFO mapred.MapTask: Finished spill 0
14/05/18 12:08:04 INFO mapred.Task: Task:attempt_local_0001_m_000000_0 is done. And is in the process of commiting
14/05/18 12:08:04 INFO mapred.LocalJobRunner:
14/05/18 12:08:04 INFO mapred.Task: Task 'attempt_local_0001_m_000000_0' done.
14/05/18 12:08:04 INFO mapred.Task: Using ResourceCalculatorPlugin : null
14/05/18 12:08:04 INFO mapred.LocalJobRunner:
14/05/18 12:08:04 INFO mapred.Merger: Merging 1 sorted segments
14/05/18 12:08:04 INFO mapred.Merger: Down to the last merge-pass, with 1 segments left of total size: 499 bytes
14/05/18 12:08:04 INFO mapred.LocalJobRunner:
14/05/18 12:08:04 INFO mapred.Task: Task:attempt_local_0001_r_000000_0 is done. And is in the process of commiting
14/05/18 12:08:04 INFO mapred.LocalJobRunner:
14/05/18 12:08:04 INFO mapred.Task: Task attempt_local_0001_r_000000_0 is allowed to commit now
14/05/18 12:08:04 INFO output.FileOutputCommitter: Saved output of task 'attempt_local_0001_r_000000_0' to hdfs://hadoop:9000/output
14/05/18 12:08:04 INFO mapred.LocalJobRunner: reduce > reduce
14/05/18 12:08:04 INFO mapred.Task: Task 'attempt_local_0001_r_000000_0' done.
14/05/18 12:08:05 INFO mapred.JobClient: map 100% reduce 100%
14/05/18 12:08:05 INFO mapred.JobClient: Job complete: job_local_0001
14/05/18 12:08:05 INFO mapred.JobClient: Counters: 19
14/05/18 12:08:05 INFO mapred.JobClient: File Output Format Counters
14/05/18 12:08:05 INFO mapred.JobClient: Bytes Written=51
14/05/18 12:08:05 INFO mapred.JobClient: FileSystemCounters
14/05/18 12:08:05 INFO mapred.JobClient: FILE_BYTES_READ=781
14/05/18 12:08:05 INFO mapred.JobClient: HDFS_BYTES_READ=456
14/05/18 12:08:05 INFO mapred.JobClient: FILE_BYTES_WRITTEN=130038
14/05/18 12:08:05 INFO mapred.JobClient: HDFS_BYTES_WRITTEN=51
14/05/18 12:08:05 INFO mapred.JobClient: File Input Format Counters
14/05/18 12:08:05 INFO mapred.JobClient: Bytes Read=228
14/05/18 12:08:05 INFO mapred.JobClient: Map-Reduce Framework
14/05/18 12:08:05 INFO mapred.JobClient: Map output materialized bytes=503
14/05/18 12:08:05 INFO mapred.JobClient: Map input records=13
14/05/18 12:08:05 INFO mapred.JobClient: Reduce shuffle bytes=0
14/05/18 12:08:05 INFO mapred.JobClient: Spilled Records=94
14/05/18 12:08:05 INFO mapred.JobClient: Map output bytes=403
14/05/18 12:08:05 INFO mapred.JobClient: Total committed heap usage (bytes)=1065484288
14/05/18 12:08:05 INFO mapred.JobClient: SPLIT_RAW_BYTES=89
14/05/18 12:08:05 INFO mapred.JobClient: Combine input records=0
14/05/18 12:08:05 INFO mapred.JobClient: Reduce input records=47
14/05/18 12:08:05 INFO mapred.JobClient: Reduce input groups=8
14/05/18 12:08:05 INFO mapred.JobClient: Combine output records=0
14/05/18 12:08:05 INFO mapred.JobClient: Reduce output records=8
14/05/18 12:08:05 INFO mapred.JobClient: Map output records=47
?
- 運行結果:
Today 4
a 4
day 4
happy 9
is 4
nice 4
wish 9
you 9
?
轉載于:https://www.cnblogs.com/wishyouhappy/p/3735044.html
總結
以上是生活随笔為你收集整理的MapReduce入门的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Linux date命令的用法
- 下一篇: 作为一个web开发人员,哪些技术细节是在