Flink编程入门(二)
Flink 有三種部署模式,分別是 Local、Standalone Cluster 和 Yarn Cluster。
?
1.1.?Local模式
?
對于 Local 模式來說,JobManager 和 TaskManager 會公用一個 JVM 來完成 Workload。如果要驗證一個簡單的應用,Local 模式是最方便的。實際應用中大多使用 Standalone 或者 Yarn Cluster,而local模式只是將安裝包解壓啟動(./bin/start-local.sh)即可,在這里不在演示。
1.2.?Standalone 模式?
1.2.1.?下載
?
安裝包下載地址:http://flink.apache.org/downloads.html
?
快速入門教程地址:
?
https://ci.apache.org/projects/flink/flink-docs-release-1.3/quickstart/setup_quickstart.html
?
?
1.2.2.?上傳安裝包到linux系統
?
使用rz命令
?
1.2.3.?解壓
?
tar –zxvf flink-1.3.2-bin-hadoop26-scala_2.10.tgz
?
1.2.4.?重命名
?
mv flink-1.3.2 flink
?
1.2.5.?修改環境變量
?
切換到root用戶配置
?
export FLINK_HOME=/home/hadoop/flink
?
export PATH=$PATH:$FLINK_HOME/bin
?
配置結束后切換會普通用戶
?
source /etc/profile
?
1.2.6.?修改配置文件
?
修改flink/conf/masters
?
master1:8081
?
修改flink/conf/slaves
?
master1ha
master2
master2ha
?
修改flink/conf/flink-conf.yaml
taskmanager.numberOfTaskSlots: 2
?jobmanager.rpc.address: master1
?
1.2.7.?啟動flink
?
/home/Hadoop/flink/bin/start-cluster.sh
?
1.2.8.?Flink 的 Rest API
Flink 和其他大多開源的框架一樣,提供了很多有用的 Rest API。不過 Flink 的 RestAPI,目前還不是很強大,只能支持一些 Monitor 的功能。Flink Dashboard 本身也是通過其 Rest 來查詢各項的結果數據。在 Flink RestAPI 基礎上,可以比較容易的將 Flink 的 Monitor 功能和其他第三方工具相集成,這也是其設計的初衷。
?
在 Flink 的進程中,是由 JobManager 來提供 Rest API 的服務。因此在調用 Rest 之前,要確定 JobManager 是否處于正常的狀態。正常情況下,在發送一個 Rest 請求給 JobManager 之后,Client 就會收到一個 JSON 格式的返回結果。由于目前 Rest 提供的功能還不多,需要增強這塊功能的讀者可以在子項目 flink-runtime-web 中找到對應的代碼。其中最關鍵一個類 WebRuntimeMonitor,就是用來對所有的 Rest 請求做分流的,如果需要添加一個新類型的請求,就需要在這里增加對應的處理代碼。下面我例舉幾個常用 Rest API。
?
1.查詢 Flink 集群的基本信息: /overview。示例命令行格式以及返回結果如下:
?
$ curl http://localhost:8081/overview
?
{"taskmanagers":1,"slots-total":16,
?
"slots-available":16,"jobs-running":0,"jobs-finished":0,"jobs-cancelled":0,"jobs-failed":0}
?
2.查詢當前 Flink 集群中的 Job 信息:/jobs。示例命令行格式以及返回結果如下:
?
$ curl http://localhost:8081/jobs
?
{"jobs-running":[],"jobs-finished":
?
["f91d4dd4fdf99313d849c9c4d29f8977"],"jobs-cancelled":[],"jobs-failed":[]}
?
3.查詢一個指定的 Job 信息: /jobs/jobid。這個查詢的結果會返回特別多的詳細的內容,這是我在瀏覽器中進行的測試,如下圖:
?
想要了解更多 Rest 請求內容的讀者,可以去 Apache Flink 的頁面中查找。
?
1.2.9.?運行測試任務
?
./bin/flink run -m master1:8082 ./examples/batch/WordCount.jar --input hdfs://master1:9000/words.txt --output hdfs://master1:9000/clinkout
?
1.3.?Flink 的 HA
?
首先,我們需要知道 Flink 有兩種部署的模式,分別是 Standalone 以及 Yarn Cluster 模式。對于 Standalone 來說,Flink 必須依賴于 Zookeeper 來實現 JobManager 的 HA(Zookeeper 已經成為了大部分開源框架 HA 必不可少的模塊)。在 Zookeeper 的幫助下,一個 Standalone 的 Flink 集群會同時有多個活著的 JobManager,其中只有一個處于工作狀態,其他處于 Standby 狀態。當工作中的 JobManager 失去連接后(如宕機或 Crash),Zookeeper 會從 Standby 中選舉新的 JobManager 來接管 Flink 集群。
?
對于 Yarn Cluaster 模式來說,Flink 就要依靠 Yarn 本身來對 JobManager 做 HA 了。其實這里完全是 Yarn 的機制。對于 Yarn Cluster 模式來說,JobManager 和 TaskManager 都是被 Yarn 啟動在 Yarn 的 Container 中。此時的 JobManager,其實應該稱之為 Flink Application Master。也就說它的故障恢復,就完全依靠著 Yarn 中的 ResourceManager(和 MapReduce 的 AppMaster 一樣)。由于完全依賴了 Yarn,因此不同版本的 Yarn 可能會有細微的差異。這里不再做深究。
?
1.3.1.?修改配置文件
?
修改flink-conf.yaml
?
state.backend: filesystem
?
state.backend.fs.checkpointdir: hdfs://master1:9000/flink-checkpoints
?
high-availability: zookeeper
?
high-availability.storageDir: hdfs://master1:9000/flink/ha/
high-availability.zookeeper.quorum: master1ha:2181,master2:2181,master2ha:2181
high-availability.zookeeper.client.acl: open
修改conf
server.1=master1ha:2888:3888
server.2=master2:2888:3888?
server.3=master2ha:2888:3888
?
修改masters
?
master1:8082
?
master1ha:8082
?
修改slaves
?
master1ha
?
master2
?
master2ha
?
1.3.2.?啟動
?
/home/Hadoop/flink/bin/start-cluster.sh
?
1.4.?Yarn Cluster 模式
?
1.4.1.?引入
?
在一個企業中,為了最大化的利用集群資源,一般都會在一個集群中同時運行多種類型的 Workload。因此 Flink 也支持在 Yarn 上面運行。首先,讓我們通過下圖了解下 Yarn 和 Flink 的關系。
?
在圖中可以看出,Flink 與 Yarn 的關系與 MapReduce 和 Yarn 的關系是一樣的。Flink 通過 Yarn 的接口實現了自己的 App Master。當在 Yarn 中部署了 Flink,Yarn 就會用自己的 Container 來啟動 Flink 的 JobManager(也就是 App Master)和 TaskManager。
?
1.4.2.?修改環境變量
?
export HADOOP_CONF_DIR= /home/hadoop/hadoop/etc/hadoop
?
1.4.3.?部署啟動
?
yarn-session.sh -d -s 2 -tm 800 -n 2
?
上面的命令的意思是,同時向Yarn申請3個container,其中 2 個 Container 啟動 TaskManager(-n 2),每個 TaskManager 擁有兩個 Task Slot(-s 2),并且向每個 TaskManager 的 Container 申請 800M 的內存,以及一個ApplicationMaster(Job Manager)。
?
?
?
Flink部署到Yarn Cluster后,會顯示Job Manager的連接細節信息。
?
Flink on Yarn會覆蓋下面幾個參數,如果不希望改變配置文件中的參數,可以動態的通過-D選項指定,如 -Dfs.overwrite-files=true -Dtaskmanager.network.numberOfBuffers=16368
?
jobmanager.rpc.address:因為JobManager會經常分配到不同的機器上
?
taskmanager.tmp.dirs:使用Yarn提供的tmp目錄
?
parallelism.default:如果有指定slot個數的情況下
?
yarn-session.sh會掛起進程,所以可以通過在終端使用CTRL+C或輸入stop停止yarn-session。
?
如果不希望Flink Yarn client長期運行,Flink提供了一種detached YARN session,啟動時候加上參數-d或—detached
?
?
?
在上面的命令成功后,我們就可以在 Yarn Application 頁面看到 Flink 的紀錄。如下圖。
?
如果在虛擬機中測試,可能會遇到錯誤。這里需要注意內存的大小,Flink 向 Yarn 會申請多個 Container,但是 Yarn 的配置可能限制了 Container 所能申請的內存大小,甚至 Yarn 本身所管理的內存就很小。這樣很可能無法正常啟動 TaskManager,尤其當指定多個 TaskManager 的時候。因此,在啟動 Flink 之后,需要去 Flink 的頁面中檢查下 Flink 的狀態。這里可以從 RM 的頁面中,直接跳轉(點擊 Tracking UI)。這時候 Flink 的頁面如圖
?
?
yarn-session.sh啟動命令參數如下:
?
Usage: ?
?
???Required ?
?
?????-n,--container <arg> ??Number of YARN container to allocate (=Number of Task Managers) ?
?
???Optional ?
?
?????-D <arg> ???????????????????????Dynamic properties ?
?
?????-d,--detached ??????????????????Start detached ?
?
?????-jm,--jobManagerMemory <arg> ???Memory for JobManager Container [in MB] ?
?
?????-nm,--name ?????????????????????Set a custom name for the application on YARN ?
?
?????-q,--query ?????????????????????Display available YARN resources (memory, cores) ?
?
?????-qu,--queue <arg> ??????????????Specify YARN queue. ?
?
?????-s,--slots <arg> ???????????????Number of slots per TaskManager ?
?
?????-st,--streaming ????????????????Start Flink in streaming mode ?
?
?????-tm,--taskManagerMemory <arg> ??Memory per TaskManager Container [in MB] ?
?
1.4.4.?提交任務
?
之后,我們可以通過這種方式提交我們的任務
?
./bin/flink run -m yarn-cluster -yn 2 ./examples/batch/WordCount.jar
?
以上命令在參數前加上y前綴,-yn表示TaskManager個數。
?
在這個模式下,同樣可以使用-m yarn-cluster提交一個"運行后即焚"的detached yarn(-yd)作業到yarn cluster。
?
?
?
1.4.5.?停止yarn cluster
?
yarn application -kill application_1507603745315_0001
?
轉載于:https://www.cnblogs.com/advise09/p/10194917.html
總結
以上是生活随笔為你收集整理的Flink编程入门(二)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: JConsole连接远程linux服务器
- 下一篇: ReentrantLock学习