Mapreduce和Yarn概念,参数优化,作用,原理,MapReduce计数器 Counter,MapReduce 多job串联之ControlledJob(来自学习资料)
3.3. MapReduce與YARN
3.3.1 YARN概述
Yarn是一個資源調度平臺,負責為運算程序提供服務器運算資源,相當于一個分布式的操作系統平臺,而mapreduce等運算程序則相當于運行于操作系統之上的應用程序
3.3.2 YARN的重要概念
1、? yarn并不清楚用戶提交的程序的運行機制
2、? yarn只提供運算資源的調度(用戶程序向yarn申請資源,yarn就負責分配資源)
3、? yarn中的主管角色叫ResourceManager
4、? yarn中具體提供運算資源的角色叫NodeManager
5、? 這樣一來,yarn其實就與運行的用戶程序完全解耦,就意味著yarn上可以運行各種類型的分布式運算程序(mapreduce只是其中的一種),比如mapreduce、storm程序,spark程序,tez ……
6、? 所以,spark、storm等運算框架都可以整合在yarn上運行,只要他們各自的框架中有符合yarn規范的資源請求機制即可
7、? Yarn就成為一個通用的資源調度平臺,從此,企業中以前存在的各種運算集群都可以整合在一個物理集群上,提高資源利用率,方便數據共享
?
3.3.3 Yarn中運行運算程序的示例
mapreduce程序的調度過程,如下圖
?
?
Mapreduce的其他補充
5.1 計數器應用
在實際生產代碼中,常常需要將數據處理過程中遇到的不合規數據行進行全局計數,類似這種需求可以借助mapreduce框架中提供的全局計數器來實現
示例代碼如下:
public class MultiOutputs { ???????? //通過枚舉形式定義自定義計數器 ???????? enum MyCounter{MALFORORMED,NORMAL} ? ???????? static class CommaMapper extends Mapper<LongWritable, Text, Text, LongWritable> { ? ???????? ???????? @Override ???????? ???????? protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { ? ???????? ???????? ???????? String[] words = value.toString().split(","); ? ???????? ???????? ???????? for (String word : words) { ???????? ???????? ???????? ???????? context.write(new Text(word), new LongWritable(1)); ???????? ???????? ???????? } ???????? ???????? ???????? //對枚舉定義的自定義計數器加1 ???????? ???????? ???????? context.getCounter(MyCounter.MALFORORMED).increment(1); ???????? ???????? ???????? //通過動態設置自定義計數器加1 ???????? ???????? ???????? context.getCounter("counterGroupa", "countera").increment(1); ???????? ???????? } ? ???????? } |
?
5.2 多job串聯
一個稍復雜點的處理邏輯往往需要多個mapreduce程序串聯處理,多job的串聯可以借助mapreduce框架的JobControl實現
?
示例代碼:
????? ??ControlledJob cJob1 = new ControlledJob(job1.getConfiguration()); ??????? ControlledJob cJob2 = new ControlledJob(job2.getConfiguration()); ??????? ControlledJob cJob3 = new ControlledJob(job3.getConfiguration()); ?????? ??????? cJob1.setJob(job1); ??????? cJob2.setJob(job2); ??????? cJob3.setJob(job3); ? ??????? // 設置作業依賴關系 ??????? cJob2.addDependingJob(cJob1); ??????? cJob3.addDependingJob(cJob2); ? ??????? JobControl jobControl = new JobControl("RecommendationJob"); ??????? jobControl.addJob(cJob1); ??????? jobControl.addJob(cJob2); ??????? jobControl.addJob(cJob3); ? ? ??????? // 新建一個線程來運行已加入JobControl中的作業,開始進程并等待結束 ??????? Thread jobControlThread = new Thread(jobControl); ??????? jobControlThread.start(); ????? ??while (!jobControl.allFinished()) { ??????????? Thread.sleep(500); ??????? } ??????? jobControl.stop(); ? ??????? return 0; |
?
?Mapreduce參數優化:
mapreduce參數優化
MapReduce重要配置參數
11.1 資源相關參數
//以下參數是在用戶自己的mr應用程序中配置就可以生效
(1) mapreduce.map.memory.mb:一個Map Task可使用的資源上限(單位:MB),默認為1024。如果Map Task實際使用的資源量超過該值,則會被強制殺死。
(2) mapreduce.reduce.memory.mb:一個Reduce Task可使用的資源上限(單位:MB),默認為1024。如果Reduce Task實際使用的資源量超過該值,則會被強制殺死。
(3) mapreduce.map.cpu.vcores:每個Map task可使用的最多cpu core數目, 默認值: 1
(4) mapreduce.reduce.cpu.vcores:每個Reduce task可使用的最多cpu core數目, 默認值: 1
?
//shuffle性能優化的關鍵參數,應在yarn啟動之前就配置好
(12) mapreduce.task.io.sort.mb?? 100????????//shuffle的環形緩沖區大小,默認100m
(13) mapreduce.map.sort.spill.percent?? 0.8???//環形緩沖區溢出的閾值,默認80%
?
?
?
(5) mapreduce.map.java.opts: Map Task的JVM參數,你可以在此配置默認的java heapsize等參數, e.g.
“-Xmx1024m -verbose:gc-Xloggc:/tmp/@taskid@.gc” (@taskid@會被Hadoop框架自動換為相應的taskid), 默認值: “”
(6) mapreduce.reduce.java.opts: Reduce Task的JVM參數,你可以在此配置默認的java heapsize等參數, e.g.
“-Xmx1024m -verbose:gc-Xloggc:/tmp/@taskid@.gc”, 默認值: “”
?
?
?
?
?
//應該在yarn啟動之前就配置在服務器的配置文件中才能生效
(7) yarn.scheduler.minimum-allocation-mb??? ?1024?? 給應用程序container分配的最小內存
(8) yarn.scheduler.maximum-allocation-mb?? ? 8192???? 給應用程序container分配的最大內存
(9)yarn.scheduler.minimum-allocation-vcores?????? 1??????
(10)yarn.scheduler.maximum-allocation-vcores???? 32
(11)yarn.nodemanager.resource.memory-mb?? 8192??每臺NodeManager最大可用內存
yarn.nodemanager.resource.cpu-vcores??? 8??? 每臺NodeManager最大可用cpu核數
?
?
11.2 容錯相關參數
(1) mapreduce.map.maxattempts: 每個Map Task最大重試次數,一旦重試參數超過該值,則認為Map Task運行失敗,默認值:4。
(2) mapreduce.reduce.maxattempts: 每個Reduce Task最大重試次數,一旦重試參數超過該值,則認為Map Task運行失敗,默認值:4。
(3) mapreduce.map.failures.maxpercent: 當失敗的Map Task失敗比例超過該值為,整個作業則失敗,默認值為0. 如果你的應用程序允許丟棄部分輸入數據,則該該值設為一個大于0的值,比如5,表示如果有低于5%的Map Task失敗(如果一個Map Task重試次數超過mapreduce.map.maxattempts,則認為這個Map Task失敗,其對應的輸入數據將不會產生任何結果),整個作業扔認為成功。
(4) mapreduce.reduce.failures.maxpercent: 當失敗的Reduce Task失敗比例超過該值為,整個作業則失敗,默認值為0.
(5) mapreduce.task.timeout: Task超時時間,經常需要設置的一個參數,該參數表達的意思為:如果一個task在一定時間內沒有任何進入,即不會讀取新的數據,也沒有輸出數據,則認為該task處于block狀態,可能是卡住了,也許永遠會卡主,為了防止因為用戶程序永遠block住不退出,則強制設置了一個該超時時間(單位毫秒),默認是300000。如果你的程序對每條輸入數據的處理時間過長(比如會訪問數據庫,通過網絡拉取數據等),建議將該參數調大,該參數過小常出現的錯誤提示是“AttemptID:attempt_14267829456721_123456_m_000224_0 Timed out after300 secsContainer killed by the ApplicationMaster.”。
11.3 本地運行mapreduce 作業
設置以下幾個參數:
mapreduce.framework.name=local
mapreduce.jobtracker.address=local
fs.defaultFS=local
11.4 效率和穩定性相關參數
(1) mapreduce.map.speculative: 是否為Map Task打開推測執行機制,默認為false
(2) mapreduce.reduce.speculative: 是否為Reduce Task打開推測執行機制,默認為false
(3) mapreduce.job.user.classpath.first& mapreduce.task.classpath.user.precedence:當同一個class同時出現在用戶jar包和hadoop jar中時,優先使用哪個jar包中的class,默認為false,表示優先使用hadoop jar中的class。
(4)mapreduce.input.fileinputformat.split.minsize: FileInputFormat做切片時的最小切片大小,(5)mapreduce.input.fileinputformat.split.maxsize:? FileInputFormat做切片時的最大切片大小
(切片的默認大小就等于blocksize,即 134217728)
?
?
總結
以上是生活随笔為你收集整理的Mapreduce和Yarn概念,参数优化,作用,原理,MapReduce计数器 Counter,MapReduce 多job串联之ControlledJob(来自学习资料)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Hadoop中通过ToolRunner和
- 下一篇: 文件夹字体怎么变大win10 Win10