Apache Flink 零基础入门(二十)Flink部署与作业的提交
之前我們都是基于Idea在本地進行開發,這種方式很適合開發以及測試,但是開發完之后,如何提交到服務器中運行?
Flink單機部署方式
本地開發和測試過程中非常有用,只要把代碼放到服務器直接運行。
前置條件
jdk8
maven3
下載解壓Flink,這里直接下載源碼編譯,直接從github上下載源碼https://github.com/apache/flink/releases 選擇1.8.1
然后解壓到本地,解壓后的文件夾如下:
編譯
接下來我們需要編譯這段源碼。
mvn clean install -DskipTests -Dfast -Pvendor-repos -Dhadoop.version=2.6.0-cdh5.15.1第一次編譯需要花費很長時間,因為需要從網上下載一些依賴文件。
編譯結果
編譯成功后,會在當前目錄下產生flink-dist/target/flink-1.8.1-bin文件夾,這個文件夾就是我們所需要的東西,把這個文件夾拷貝到服務器中就可以進行部署了。
單機模式部署
將flink-1.8.1-bin文件夾拷貝到服務器中,服務器中的目錄如下:
啟動服務
當前是stand-alone模式,輸入jps可以查看:
可以在UI上面看到,默認ip是8081:
使用Flink跑一個WordCount程序
使用socket方式讀入數據,然后統計數據wordcount
在服務器中開啟socket:nc -lk 9999
然后運行命令:
在WEB界面中可以查看任務:
測試數據
在nc終端輸入數據(使用tab分割):
查看運行結果
在flink-1.8.1/log路徑下,有一個flink--taskexecutor-.out文件,就是輸出結果文件。
停止任務
可以在WEB界面上點擊Running JOB,然后點擊任務:
點擊Cancel就可以取消這個任務了。這時,取消成功之后,可用slot就變為1了:
停止集群
使用命令./bin/stop-cluster.sh就可以了
Flink分布式standalone部署方式
我們在conf/flink-conf.yaml文件中配置主節點(jobmanager.rpc.address)的ip.
在conf/slaves文件中配置從節點(taskmanager)的ip
常用配置
jobmanager.rpc.address參數用來指向master節點的地址
jobmanager.heap.size 表示jobmanager節點可用的內存
taskmanager.heap.size表示taskmanager節點可用的內存
taskmanager.numberOfTaskSlots 每一個機器可用的CPU個數,決定了并行度
paraparallelism.default 表示任務的并行度 可以在代碼層面覆蓋
taskmanager.tmp.dirs taskmanager的臨時數據存儲目錄
修改配置文件
修改conf/flink-conf.yaml文件:
jobmanager.rpc.address: swarm-manager(修改為本服務器的主機名)
修改conf/slaves文件:swarm-manager
使用命令./bin/start-cluster.sh啟動集群,可以正常啟動:
修改配置:
taskmanager.heap.size: 2048m
taskmanager.numberOfTaskSlots: 2
再次啟動集群:
可以看到配置已經生效了。
這時再次使用命令nc -lk 9999 然后在另一個終端運行./bin/flink run examples/streaming/SocketWindowWordCount.jar --port 9999 開啟一個任務。這時查看頁面:
可以看到之前兩個slots,現在已經用了一個,因為當前的并行度是1。
擴展和容錯
現在我們輸入jps命令時:
我們現在把TaskManagerRunner給Kill掉,然后在查看:
這時在查看Web頁面:
可以看到都變成0了。
這時再啟動taskmanager.sh
再次查看:
yarn方式部署
搭建Hadoop
下載Hadoop 2.6.0 解壓,配置環境變量:
修改配置文件etc/hadoop/hadoop-env.sh ,配置java home export JAVA_HOME=/usr/jdk1.8.0_101
修改配置文件etc/hadoop/core-site.xml,配置:
修改配置文件etc/hadoop/hdfs-site.xml,配置:
<configuration><property><name>dfs.replication</name><value>1</value></property><property><name>hadoop.tmp.dir</name><value>/home/iie4bu/app/tmp</value></property> </configuration>修改配置文件slave 內容:swarm-manager
接下來配置yarn-site.xml
修改mapred-site.xml.template文件名為:mapred-site.xml ,內容如下:
<configuration><property><name>yarn.nodemanager.aux-services</name><value>mapreduce_shuffle</value></property> </configuration>格式化hdfs,使用命令~/app/hadoop-2.6.0/bin$ ./hdfs namenode -format
啟動hadoop,使用命令~/app/hadoop-2.6.0/sbin$ ./start-all.sh
使用jps查看啟動狀態:
使用WEB頁面進行查看
首先查看hdfs:http://192.168.170.170:50070
然后查看yarn:http://192.168.170.170:8088
測試一下
測試HDFS :
沒有問題!
Flink On Yarn
方式一:Flink初始化的時候就申請一個資源,并且作為一個服務常駐在yarn中。Flink 的job共用一個yarn session。資源不夠用時,出現排隊情況。
從flink1.8之后hadoop相關jar包需要額外下載:
將下載后的flink-shaded-hadoop-2-uber-2.6.5-7.0.jar文件放到 flink-1.8.1/lib目錄下,然后可以輸入 ./bin/yarn-session.sh --help
./bin/yarn-session.sh參數:
方式二: 每次提交Flink job都申請一個yarn session,用完之后釋放。任務之間不相互影響,任務之間是相互獨立的。
使用方式一
啟動一個常服務./bin/yarn-session.sh -n 1 -jm 1024m -tm 4096m表示啟動job manager是1GB的堆內存,task manager是4GB的堆內存。
使用這個命令之后,打開yarn頁面,http://192.168.170.170:8088/cluster
使用方式二:./bin/flink run -m yarn-cluster -p 4 -yjm 1024m -ytm 4096m ./examples/batch/WordCount.jar
總結
以上是生活随笔為你收集整理的Apache Flink 零基础入门(二十)Flink部署与作业的提交的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Apache Flink 零基础入门(二
- 下一篇: Flink分布式standalone部署