flink 不设置水印_从0到1学习Flink—— Flink parallelism 和 Slot 介绍
前言
之所以寫這個是因為前段時間自己的項目出現(xiàn)過這樣的一個問題:
1Caused?by:?akka.pattern.AskTimeoutException:?2Ask?timed?out?on?[Actor[akka://flink/user/taskmanager_0#15608456]]?after?[10000?ms].?
3Sender[null]?sent?message?of?type?"org.apache.flink.runtime.rpc.messages.LocalRpcInvocation".
跟著這問題在 Flink 的 Issue 列表里看到了一個類似的問題:https://issues.apache.org/jira/browse/FLINK-9056
,看下面的評論差不多就是 TaskManager 的 slot 數(shù)量不足的原因,導(dǎo)致 job 提交失敗。在 Flink 1.63 中已經(jīng)修復(fù)了變成拋出異常了。
竟然知道了是因為 slot 不足的原因了,那么我們就要先了解下 slot 是什么東東呢?不過文章這里先介紹下 parallelism。
什么是 parallelism?
如翻譯這樣,parallelism 是并行的意思,在 Flink 里面代表每個任務(wù)的并行度,適當(dāng)?shù)奶岣卟⑿卸瓤梢源蟠筇岣?job 的執(zhí)行效率,比如你的 job 消費 kafka 數(shù)據(jù)過慢,適當(dāng)調(diào)大可能就消費正常了。
那么在 Flink 中怎么設(shè)置并行度呢?
如何設(shè)置 parallelism?
如上圖,在 flink 配置文件中可以查看到默認并行度是 1,
1cat?flink-conf.yaml?|?grep?parallelism2
3#?The?parallelism?used?for?programs?that?did?not?specify?and?other?parallelism.
4parallelism.default:?1
所以你如何在你的 flink job 里面不設(shè)置任何的 parallelism 的話,那么他也會有一個默認的 parallelism = 1。那也意味著你可以修改這個配置文件的默認并行度。
如果你是用命令行啟動你的 Flink job,那么你也可以這樣設(shè)置并行度(使用 -p 并行度):
1./bin/flink?run?-p?10?../word-count.jar你也可以通過這樣來設(shè)置你整個程序的并行度:
1StreamExecutionEnvironment?env?=?StreamExecutionEnvironment.getExecutionEnvironment();2env.setParallelism(10);
注意:這樣設(shè)置的并行度是你整個程序的并行度,那么后面如果你的每個算子不單獨設(shè)置并行度覆蓋的話,那么后面每個算子的并行度就都是這里設(shè)置的并行度的值了。
如何給每個算子單獨設(shè)置并行度呢?
1data.keyBy(new?xxxKey())2????.flatMap(new?XxxFlatMapFunction()).setParallelism(5)
3????.map(new?XxxMapFunction).setParallelism(5)
4????.addSink(new?XxxSink()).setParallelism(1)
如上,就是在每個算子后面單獨的設(shè)置并行度,這樣的話,就算你前面設(shè)置了 env.setParallelism(10) 也是會被覆蓋的。
這也說明優(yōu)先級是:算子設(shè)置并行度 > env 設(shè)置并行度 > 配置文件默認并行度
并行度講到這里應(yīng)該都懂了,下面 zhisheng 就繼續(xù)跟你講講 什么是 slot?
什么是 slot?
其實什么是 slot 這個問題之前在第一篇文章?《從0到1學(xué)習(xí)Flink》—— Apache Flink 介紹?中就介紹過了,這里再講細一點。
圖中 Task Manager 是從 Job Manager 處接收需要部署的 Task,任務(wù)的并行性由每個 Task Manager 上可用的 slot 決定。每個任務(wù)代表分配給任務(wù)槽的一組資源,slot 在 Flink 里面可以認為是資源組,Flink 將每個任務(wù)分成子任務(wù)并且將這些子任務(wù)分配到 slot 來并行執(zhí)行程序。
例如,如果 Task Manager 有四個 slot,那么它將為每個 slot 分配 25% 的內(nèi)存。 可以在一個 slot 中運行一個或多個線程。 同一 slot 中的線程共享相同的 JVM。 同一 JVM 中的任務(wù)共享 TCP 連接和心跳消息。Task Manager 的一個 Slot 代表一個可用線程,該線程具有固定的內(nèi)存,注意 Slot 只對內(nèi)存隔離,沒有對 CPU 隔離。默認情況下,Flink 允許子任務(wù)共享 Slot,即使它們是不同 task 的 subtask,只要它們來自相同的 job。這種共享可以有更好的資源利用率。
文字說的比較干,zhisheng 這里我就拿下面的圖片來講解:
上面圖片中有兩個 Task Manager,每個 Task Manager 有三個 slot,這樣我們的算子最大并行度那么就可以達到 6 個,在同一個 slot 里面可以執(zhí)行 1 至多個子任務(wù)。
那么再看上面的圖片,source/map/keyby/window/apply 最大可以有 6 個并行度,sink 只用了 1 個并行。
每個 Flink TaskManager 在集群中提供 slot。 slot 的數(shù)量通常與每個 TaskManager 的可用 CPU 內(nèi)核數(shù)成比例。一般情況下你的 slot 數(shù)是你每個 TaskManager 的 cpu 的核數(shù)。
但是 flink 配置文件中設(shè)置的 task manager 默認的 slot 是 1。
slot 和 parallelism
下面給出官方的圖片來更加深刻的理解下 slot:
1、slot 是指 taskmanager 的并發(fā)執(zhí)行能力
taskmanager.numberOfTaskSlots:3
每一個 taskmanager 中的分配 3 個 TaskSlot, 3 個 taskmanager 一共有 9 個 TaskSlot。
2、parallelism 是指 taskmanager 實際使用的并發(fā)能力
parallelism.default:1
運行程序默認的并行度為 1,9 個 TaskSlot 只用了 1 個,有 8 個空閑。設(shè)置合適的并行度才能提高效率。
3、parallelism 是可配置、可指定的
上圖中 example2 每個算子設(shè)置的并行度是 2, example3 每個算子設(shè)置的并行度是 9。
example4 除了 sink 是設(shè)置的并行度為 1,其他算子設(shè)置的并行度都是 9。
好了,既然并行度和 slot zhisheng 都帶大家過了一遍了,那么再來看文章開頭的問題:slot 資源不夠。
問題原因
現(xiàn)在這個問題的答案其實就已經(jīng)很明顯了,就是我們設(shè)置的并行度 parallelism 超過了 Task Manager 能提供的最大 slot 數(shù)量,所以才會報這個錯誤。
再來拿我的代碼來看吧,當(dāng)時我就是只設(shè)置了整個項目的并行度:
1env.setParallelism(15);為什么要設(shè)置 15 呢,因為我項目消費的 Kafka topic 有 15 個 parttion,就想著讓一個并行去消費一個 parttion,沒曾想到 Flink 資源的不夠,稍微降低下 并行度為 10 后就沒出現(xiàn)這個錯誤了。
總結(jié)
本文由自己項目生產(chǎn)環(huán)境的一個問題來講解了自己對 Flink parallelism 和 slot 的理解,并告訴大家如何去設(shè)置這兩個參數(shù),最后也指出了問題的原因所在。
關(guān)注我
轉(zhuǎn)載請務(wù)必注明原創(chuàng)地址為:http://www.54tianzhisheng.cn/2019/01/14/Flink-parallelism-slot/?, 未經(jīng)允許禁止轉(zhuǎn)載。
微信公眾號:zhisheng
另外我自己整理了些 Flink 的學(xué)習(xí)資料,目前已經(jīng)全部放到微信公眾號了。你可以加我的微信:zhisheng_tian,然后回復(fù)關(guān)鍵字:Flink?即可無條件獲取到。
Github 代碼倉庫
https://github.com/zhisheng17/flink-learning/
以后這個項目的所有代碼都將放在這個倉庫里,包含了自己學(xué)習(xí) flink 的一些 demo 和博客
相關(guān)文章
1、《從0到1學(xué)習(xí)Flink》—— Apache Flink 介紹
2、《從0到1學(xué)習(xí)Flink》—— Mac 上搭建 Flink 1.6.0 環(huán)境并構(gòu)建運行簡單程序入門
3、《從0到1學(xué)習(xí)Flink》—— Flink 配置文件詳解
4、《從0到1學(xué)習(xí)Flink》—— Data Source 介紹
5、《從0到1學(xué)習(xí)Flink》—— 如何自定義 Data Source ?
6、《從0到1學(xué)習(xí)Flink》—— Data Sink 介紹
7、《從0到1學(xué)習(xí)Flink》—— 如何自定義 Data Sink ?
8、《從0到1學(xué)習(xí)Flink》—— Flink Data transformation(轉(zhuǎn)換)
9、《從0到1學(xué)習(xí)Flink》—— 介紹Flink中的Stream Windows
10、《從0到1學(xué)習(xí)Flink》—— Flink 中的幾種 Time 詳解
11、《從0到1學(xué)習(xí)Flink》—— Flink 寫入數(shù)據(jù)到 ElasticSearch
12、《從0到1學(xué)習(xí)Flink》—— Flink 項目如何運行?
13、《從0到1學(xué)習(xí)Flink》—— Flink 寫入數(shù)據(jù)到 Kafka
14、《從0到1學(xué)習(xí)Flink》—— Flink JobManager 高可用性配置
總結(jié)
以上是生活随笔為你收集整理的flink 不设置水印_从0到1学习Flink—— Flink parallelism 和 Slot 介绍的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: php乱码调试,NotePad++ 调试
- 下一篇: 手机访问服务器中的数据库文件,手机连接服