Hadoop Map/Reduce的工作流
問題描述
我們的數據分析平臺是單一的Map/Reduce過程,由于半年來不斷地增加需求,導致了問題已經不是那么地簡單,特別是在Reduce階段,一些大對象會常駐內存。因此越來越頂不住壓力了,當前內存問題已經是最大的問題,每個Map占用5G,每個Reduce占用9G!直接導致當數據分析平臺運行時,集群處于資源匱乏狀態。
?
因此,在不改變業務數據計算的條件下,將單一的Map/Reduce過程分解成2個階段。這個時候,需求就相對來說比較復雜,將第一階段的Reduce結果輸出至HDFS,作為第二階段的輸入。
?
其基本過程圖很簡單如下所示:
我們可以使用啟動兩個Job,在第一個階段Job完成之后,再進行第二階段Job的執行。但是更好的方式是使用Hadoop中提供的JobControl工具,這個工具可以加入多個等待執行的子Job,并定義其依賴關系,決定執行的先后順序。
?
JobControl jobControl = new JobControl(GROUP_NAME);JobConf phase1JobConf = Phase1Main.getJobConf(getConf(), jsonConfigFilePhase1, reduceCountOne);Job phase1Job = new Job(phase1JobConf);jobControl.addJob(phase1Job);JobConf phase2JobConf = Phase2Main.getJobConf(getConf(), jsonConfigFilePhase2, reduceCountTwo);Job phase2Job = new Job(phase2JobConf);jobControl.addJob(phase2Job);phase2Job.addDependingJob(phase1Job);jobControl.run();return jobControl.getFailedJobs() == null || jobControl.getFailedJobs().isEmpty() ? 0 : 1;?
正如代碼所示,可以使用job的addDependingJob(JobConf)方法來定義其依賴關系。
?
但是這種方式有一個非常大的缺點,如果中間數據結果過大,將其放置在HDFS上是非常浪費磁盤資源,同時也帶來后續過多的I/O操作,包括第一階段的寫磁盤和第二階段的讀磁盤(而且本身中間結果數據也沒有什么太大用途)。
?
經過查閱,在Hadoop中,一個Job可以按順序執行多個mapper對數據進行前期的處理,再進行Reduce,Reduce執行完成后,還可以繼續執行多個Mapper,形成一個處理鏈結構,這樣的Job是不會存儲中間結果的,大大減少了磁盤I/O操作。
?
但這種方式也對map/reduce程序有個要求,就是只能存在一個Partition規則,因為整個鏈條中只會存在一次Reduce操作。前文介紹的那兩個階段的Partition規則如果不一致,是不能改造成這種方式的。
?
這種方式的大致流程圖如下:
?
?
?
由于我們的分析程序中,第二步就需要根據一定的規則進行聚集,因此第二步就需要進行Reduce,將原來第四步的Reduce階段強行改造成Map階段。注意,Map階段之間互相傳遞數據時,其數量是固定的,而且不會進行聚集(Reduce)操作,還是需要按照流的方式進行處理,因此最好要先排序。單個Map的結果只會傳遞給特定的單個下個步驟的Map端。
?
在ChainMain類中會執行這種方式,需要借助于ChainMapper和ChainReducer兩個Hadoop中提供的類:
String finalJobName = TongCommonConstants.JOB_NAME + jobNameSuffix;jobConf.setJobName(finalJobName);jobConf.setInputFormat(RawLogInputFormat.class);jobConf.setPartitionerClass(Phase1Partitioner.class);jobConf.setNumReduceTasks(reduceCountTwo);jobConf.set(TongCommonConstants.DIC_INFO, jsonConfigFile);DicInfoManager.getInstance().readDicManager(jobConf, jsonConfigFile);String yesterdayOutDir = DicInfoManager.getInstance().getDicManager().getPrevious_day_output_path();JobConf phase1JobConf = getJobConf(jsonConfigFile, yesterdayOutDir);ChainMapper.addMapper(jobConf, Phase1Mapper.class, Text.class, History.class, Phase1KeyDecorator.class,BytesWritable.class, true, phase1JobConf);JobConf phase2ReducerConf = getJobConf(jsonConfigFile, yesterdayOutDir);ChainReducer.setReducer(jobConf, Phase1Reducer.class, Phase1KeyDecorator.class, BytesWritable.class,Text.class, Text.class, true, phase2ReducerConf);JobConf phase3ChainJobConf = getJobConf(jsonConfigFile, yesterdayOutDir);ChainReducer.addMapper(jobConf, Phase3ChainMapper.class, Text.class, Text.class, Phase2KeyDecorator.class,BytesWritable.class, true, phase3ChainJobConf);JobConf phase4ChainJobConf = getJobConf(jsonConfigFile, yesterdayOutDir);ChainReducer.addMapper(jobConf, Phase4ChainMapper.class, Phase2KeyDecorator.class, BytesWritable.class,Text.class, Text.class, true, phase4ChainJobConf);RunningJob runningJob = JobClient.runJob(jobConf);runningJob.waitForCompletion();return runningJob.isSuccessful() ? 0 : 1;?
經過這種方式的改造后,對原有程序的影響最小,因為不需要定義中間結果存儲地址,當然也不需要定義第二階段的配置文件。
?
新手比較容易犯的一個錯誤是,Reducer后面的map步驟要使用ChainReducer.addMapper方法而不是ChainMapper.addMapper方法,否則會抱下面的異常,我就在這個上面栽了跟頭,查了很久。
Exception in thread "main" java.lang.IllegalArgumentException: The specified Mapper input key class does not match the previous Mapper's output key class. at org.apache.hadoop.mapreduce.lib.chain.Chain.validateKeyValueTypes(Chain.java:695) at org.apache.hadoop.mapred.lib.Chain.addMapper(Chain.java:104)?
Hadoop工作流中的JobControl
很多情況下,用戶編寫的作業比較復雜,相互之間存在依賴關系,這種可以用有向圖表示的依賴關系稱之為“工作流”。
JobControl是由兩個類組成:Job和JobControl,Job的狀態轉移圖如下:
?
作業在剛開始的時候處于Waiting狀態,如果沒有依賴作業或者所有依賴作業都已經完成的情況下,進入Ready狀態;一旦進入Ready狀態,則作業可被提交到Hadoop集群上運行,并進入Running狀態,根據作業的運行情況,可能進入Success或Failed狀態。需要注意的是,如果一個作業的依賴作業失敗,則該作業也會失敗,后續的所有作業也都會失敗。
JobControl封裝了一系列MapReduce作業及其對應的依賴關系,它將處于不同狀態的作業放入不同的哈希表,按照Job的狀態轉移圖轉移作業,直到所有作業運行完成。在實現的時候,JobControl包含一個線程用于周期性地監控和更新各個作業的運行狀態,調度依賴作業運行完成的作業,提交Ready狀態的作業等。
ChainMapper/ChainReducer主要是為了解決線性鏈式Mapper而提出的,在Map或Reduce階段存在多個Mapper,像多個Linux管道一樣,前一個Mapper的輸出結果直接重定向到下一個Mapper的輸入,形成一個流水線,最后的Mapper或Reducer才會將結果寫到HDFS上。對于任意一個MapReduce作業,Map和Reduce階段可以由無限個Mapper,但只能有一個Reducer。
Hadoop MapReduce有一個約定,函數OutputCollector.collect(key, value)執行期間不能改變key和value的值,這是因為某個map/reduce調用該方法之后,可能后續繼續再次使用key和value的值,如果被改變,可能會造成潛在的錯誤。
ChainMapper/Reducer實現的關鍵技術點就是修改Mapper和Reducer的輸出流,將本來要寫入文件的輸出結果重定向到另外一個Mapper中。盡管鏈式作業在Map和Reduce階段添加了多個Mapper,但仍然只是一個MapReduce作業,因而只能有一個與之對應的JobConf對象。
ChainMapper中實現的map函數大概如下:
?
chain.getMapperCollector返回一個OutputCollector實現,即ChainOutputCollector,collector方法大概如下:
?
?在使用ChainMapper/ChainReducer時需要注意一個問題:就是其中參數byValue的選擇,究竟是該傳值還是傳遞引用。因為在Hadoop編程中需要處理的數據量比較大,經常使用復用同一個對象的情況,普通的Mapper/Reducer程序由于不會執行鏈式處理,在其他的JVM中來重建Map輸出的對象,而Chain API中需要管道一樣的操作來進行下一步處理,Mapper.map()函數調用完outputCollector.collect(key, value)之后,可能再次使用key和value的值,才導致這個問題的發生。
?
個人總覺得雖然重用引用的方式雖然可以節省一定的內存,但是不重用引用也僅僅會對Minor GC造成一定的壓力,如果嚴格控制生成的new對象Key,Value的生命周期的話。
?
正是為了防止OutputCollector直接對key/value進行修改,ChainMapper允許用戶指定key/value的傳遞方式,如果編寫的程序確定key/value執行期間不會被重用以修改(如果是不可變對象最好),則可以選擇按照引用來進行傳遞,否則按值傳遞。需要注意的是,引用傳遞可以避免對象的深層拷貝,提高處理效率,但需要編程時做出key/value不能修改的保證。
?
?
轉載于:https://www.cnblogs.com/mmaa/p/5789916.html
總結
以上是生活随笔為你收集整理的Hadoop Map/Reduce的工作流的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 创业思路(1) - 收藏夹分享平台
- 下一篇: 1980元 2022款蔚来ET7中控升级