04_Flink-HA高可用、Standalone集群模式、Flink-Standalone集群重要参数详解、集群节点重启及扩容、启动组件、Flink on Yarn、启动命令等
1.4.Flink集群安裝部署standalone+yarn
1.4.1.Standalone集群模式
1.4.2.Flink-Standalone集群重要參數詳解
1.4.3.集群節點重啟及擴容
1.4.3.1.啟動jobmanager
1.4.3.2.啟動taskmanger
1.4.3.3.Flink standalone集群中job的容錯
1.4.4.Flink on Yarn
1.4.4.1.原理介紹
1.4.4.2.FLINK on yarn集群部署
1.4.5.Flink on Yarn的兩種運行方式
1.4.5.1.第一種[yarn-session.sh(開辟資源)+flink run(提交任務)]
1.4.5.2.第二種[flink run -m yarn-cluster(開辟資源 + 提交任務)]
1.4.5.3…/bin/yarn-session.sh命令分析
1.4.5.4./bin/flink run命令分析
1.4.5.5.Flink在yarn上的分布
1.4.5.6.Flink on yarn內部實現
1.4.Flink集群安裝部署standalone+yarn
1.4.1.Standalone集群模式
Standalone集群架構展示:Client客戶端提交任務給JobManager,JobManager負責Flink集群計算資源管理,并分發任務給TaskManager執行,TaskManager定期向JobManager匯報狀態。
?依賴環境
?Jdk1.8及以上[配置JAVA_HOME環境變量]
?ssh免密登錄[集群內節點之間免密登錄]
?集群規劃
master(JobManager) + slave/worker(TaskManager)
hadoop4(master)
hadoop5(salve)
hadoop6(salve)
?集群安裝
A:修改conf/flink-conf.yaml
B:修改conf/masters文件內容:
hadoop4:8081C:修改conf/workers (這里將三臺機器都作為worker節點使用)
hadoop4
hadoop5
hadoop6
D:拷貝到其它節點
E:在hadoop4節點啟動
[root@hadoop4 flink-1.11.1]# pwd /home/admin/installed/flink-1.11.1 [root@hadoop4 flink-1.11.1]# bin/start-cluster.shF:訪問集群:http://xxx.xxx.xxxx.xxx:8081/#/overview
1.4.2.Flink-Standalone集群重要參數詳解
jobmanager.memory.process.size: 1600m The total process memory size for the JobManager. taskmanager.memory.process.size: 1728m The total process memory size for the TaskManager taskmanager.numberOfTaskSlots: 20 每臺機器上可用CPU數量 parallelism.default: 1 The parallelism used for programs that did not specify and other parallelismslot和parallelism總結
1.slot是靜態的概念,是指taskmanager具有的并發執行能力。
2.parallelism是動態的概念,是指程序運行時實際使用的并發能力。
3.設置合適的parallelism能提高運算效率,太多了和太少了都不行。
1.4.3.集群節點重啟及擴容
1.4.3.1.啟動jobmanager
如果集群中的jobmanager進程掛了,執行下面命令啟動。
bin/jobmanager.sh start bin/jobmanager.sh stop1.4.3.2.啟動taskmanger
添加新的taskmanager節點或者重啟taskmanager節點
bin/taskmanager.sh start bin/taskmanager.sh stop1.4.3.3.Flink standalone集群中job的容錯
?jobmanager掛掉
?正在執行的任務會失敗
?存在單點故障,(Flink支持HA)
?taskmanager掛掉
如果有多余的taskmanger節點,flink會自動把任務調度到其它節點執行。
1.4.4.Flink on Yarn
FLINK on yarn模式的原理是依靠YARN來調度FLINK任務,目前在企業中使用較多。這種模式的好處是可以充分利用集群資源,提高集群機器的利用率,并且只需要1套Hadoop集群,就可以執行MapReduce、Spark和FLINK任務,操作非常方便,運維方面也很輕松。
1.4.4.1.原理介紹
1)當啟動一個新的 Flink YARN Client 會話時,客戶端首先會檢查所請求的資源(容器和內存)是否可用。之后,它會上傳 Flink 配置和 JAR 文件到 HDFS。。
2)客 戶 端 請 求 一個 YARN 容 器 啟動 ApplicationMaster 。 JobManager 和ApplicationMaster(AM)運行在同一個容器中,一旦它們成功地啟動了,AM 就能夠知道JobManager 的地址,它會為 TaskManager 生成一個新的 Flink 配置文件(這樣它才能連上 JobManager),該文件也同樣會被上傳到 HDFS。另外,AM 容器還提供了 Flink 的Web 界面服務。Flink 用來提供服務的端口是由用戶和應用程序 ID 作為偏移配置的,這使得用戶能夠并行執行多個 YARN 會話。
3)之后,AM 開始為 Flink 的 TaskManager 分配容器(Container),從 HDFS 下載 JAR 文件和修改過的配置文件。一旦這些步驟完成了,Flink 就安裝完成并準備接受任務了。
Flink n on n Yarn 模式在使用的時候又可以分為兩Session-Cluster和Per-Job-Cluster
Session-Cluster
這種模式是在 YARN 中提前初始化一個 Flink 集群(稱為 Flinkyarn-session),開辟指定的資源,以后的 Flink 任務都提交到這里。這個 Flink 集群會常駐在 YARN 集群中,除非手工停止。這種方式創建的 Flink 集群會獨占資源,不管有沒有 Flink 任務在執行,YARN 上面的其他任務都無法使用這些資源。
Per-Job-Cluster
這種模式,每次提交 Flink 任務都會創建一個新的 Flink 集群,每個 Flink 任務之間相互獨立、互不影響,管理方便。任務執行完成之后創建的 Flink集群也會消失,不會額外占用資源,按需使用,這使資源利用率達到最大,在工作中推薦使用這種模式。
1.4.4.2.FLINK on yarn集群部署
?依賴環境
?本次部署hadoop3.1.0版本
?Hdfs & yarn
?Flink on Yarn的兩種使用方式
第一種: 在Yarn中初始化一個flink集群,開辟指定的資源,以后提交任務都向這里提交。這個flink集群會常駐在Yarn集群中,除非手工停止。
第二中(推薦): 每次提交都會創建一個新的flink集群,任務之間互相獨立,互不影響,方便管理。任務執行完成之后創建的集群也會消失。
1.4.5.Flink on Yarn的兩種運行方式
1.4.5.1.第一種[yarn-session.sh(開辟資源)+flink run(提交任務)]
啟動一個一直運行的flink集群
./bin/yarn-session.sh -n 2 -jm 1024 -tm 1024 [-d]附著到一個已存在的flink yarn session
./bin/yarn-session.sh -id application_1463870264508_0029執行任務
./bin/flink run ./examples/batch/WordCount.jar -input hdfs://hadoop100:9000/LICENSE -output hdfs://hadoop100:9000/wordcount-result.txt停止任務 【web界面或者命令行執行cancel命令】
關于命令含義:
[root@hadoop4 flink-1.11.1]# ./bin/yarn-session.sh --help Usage:必選:-n,--container <arg> 表示分配容器的數量(也就是 TaskManager 的數量)可選-at,--applicationType <arg> Set a custom application type for the application on YARN-D <property=value> use value for given property (動態屬性)-d,--detached If present, runs the job in detached mode (獨立運行)-h,--help Help for the Yarn session CLI.-id,--applicationId <arg> Attach to running YARN session (附著到一個已存在的flink yarn session)-j,--jar <arg> Path to Flink jar file -jm,--jobManagerMemory <arg> Memory for JobManager Container with optional unit (default: MB) (JobManager容器的內存大小)-m,--jobmanager <arg> Address of the JobManager to which to connect. Use this flag to connect to a different JobManager than the one specified in the configuration.-nl,--nodeLabel <arg> Specify YARN node label for the YARN application-nm,--name <arg> Set a custom name for the application on YARN (在YARN上為一個自定義的應用設置一個名字)-q,--query Display available YARN resources (memory, cores) 顯示yarn中可用的資源 (內存, cpu核數)-qu,--queue <arg> Specify YARN queue. 指定YARN隊列-s,--slots <arg> taskmanager分配多少個slots(處理進程)。建議設置為每個機器的CPU核數-t,--ship <arg> Ship files in the specified directory (t for transfer) -tm,--taskManagerMemory <arg> Memory per TaskManager Container with optional unit (default: MB) 每個TaskManager容器的內存 [in MB]-yd,--yarndetached If present, runs the job in detached mode (deprecated; use non-YARN specific option instead)-z,--zookeeperNamespace <arg> Namespace to create the Zookeeper sub-paths for high availability mode 針對HA模式在zookeeper上創建NameSpace1.4.5.2.第二種[flink run -m yarn-cluster(開辟資源 + 提交任務)]
啟動集群,執行任務
$FLINK_HOME/bin/flink run -d -m yarn-cluster \ -p 1 \ -yjm 1024m \ -ytm 1024m \ -ynm IssuePassComplete \ -c com.tianque.issue.flink.handler.IssuePassCompleteFlinkHandlerByCustomRedisSink \ /home/admin/installed/flink-jars/IssuePassCompleteFlinkHandler.jar \ --port 38092注意:client端必須要設置YARN_CONF_DIR或者HADOOP_CONF_DIR或者HADOOP_HOME環境變量,通過這個環境變量來讀取YARN和HDFS的配置信息,否則啟動會失敗
1.4.5.3…/bin/yarn-session.sh命令分析
用法: 必選 -n,--container <arg> 分配多少個yarn容器 (=taskmanager的數量) 可選 -D <arg> 動態屬性 -d,--detached 獨立運行 -jm,--jobManagerMemory <arg> JobManager的內存 [in MB] -nm,--name 在YARN上為一個自定義的應用設置一個名字 -q,--query 顯示yarn中可用的資源 (內存, cpu核數) -qu,--queue <arg> 指定YARN隊列. -s,--slots <arg> 每個TaskManager使用的slots數量 -tm,--taskManagerMemory <arg> 每個TaskManager的內存 [in MB] -z,--zookeeperNamespace <arg> 針對HA模式在zookeeper上創建NameSpace -id,--applicationId <yarnAppId> YARN集群上的任務id,附著到一個后臺運行的yarn session中1.4.5.4./bin/flink run命令分析
run [OPTIONS] <jar-file> <arguments> "run" 操作參數: -c,--class <classname> 如果沒有在jar包中指定入口類,則需要在這里通過這個參數指定 -m,--jobmanager <host:port> 指定需要連接的jobmanager(主節點)地址,使用這個參數可以指定一個不同于配置文件中的jobmanager -p,--parallelism <parallelism> 指定程序的并行度。可以覆蓋配置文件中的默認值。默認查找當前yarn集群中已有的yarn-session信息中的jobmanager【/tmp/.yarn-properties-root】: ./bin/flink run ./examples/batch/WordCount.jar -input hdfs://hostname:port/hello.txt -output hdfs://hostname:port/result1連接指定host和port的jobmanager: ./bin/flink run -m hadoop100:1234 ./examples/batch/WordCount.jar -input hdfs://hostname:port/hello.txt -output hdfs://hostname:port/result1啟動一個新的yarn-session: ./bin/flink run -m yarn-cluster -yn 2 ./examples/batch/WordCount.jar -input hdfs://hostname:port/hello.txt -output hdfs://hostname:port/result1 注意:yarn session命令行的選項也可以使用./bin/flink 工具獲得。它們都有一個y或者yarn的前綴 例如:./bin/flink run -m yarn-cluster -yn 2 ./examples/batch/WordCount.jar再如以下運行案例:
如果一直報akka,AskTimeoutException錯誤,可以嘗試添加akka.ask.timeout=120000s, 依然顯示該錯誤。
注意這里數字和時間單位之間,必須有個空格。
1.4.5.5.Flink在yarn上的分布
?Flink on Yarn
?ResourceManager
?NodeManager
?AppMaster(jobmanager和它運行在一個Container中)
?Container(taskmanager運行在上面)
?使用on-yarn的好處
?提高集群機器的利用率
?一套集群,可以執行MR任務,spark任務,flink任務等…
1.4.5.6.Flink on yarn內部實現
步驟如下:
1、上傳flink jar包和配置
2、申請資源和請求AppMaster容器
3、分配AppMaster容器
分配worker
總結
以上是生活随笔為你收集整理的04_Flink-HA高可用、Standalone集群模式、Flink-Standalone集群重要参数详解、集群节点重启及扩容、启动组件、Flink on Yarn、启动命令等的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 03_Flink本地安装、分别解压sca
- 下一篇: 浮盈是什么意思