Flink 1.10 细粒度资源管理解析
相信不少讀者在開發(fā) Flink 應(yīng)用時或多或少會遇到在內(nèi)存調(diào)優(yōu)方面的問題,比如在我們生產(chǎn)環(huán)境中遇到最多的 TaskManager 在容器化環(huán)境下占用超出容器限制的內(nèi)存而被 YARN/Mesos kill 掉[1],再比如使用 heap-based StateBackend 情況下 State 過大導(dǎo)致 GC 頻繁影響吞吐。這些問題對于不熟悉 Flink 內(nèi)存管理的用戶來說十分難以排查,而且 Flink 晦澀難懂的內(nèi)存配置參數(shù)更是讓用戶望而卻步,結(jié)果是往往將內(nèi)存調(diào)大至一個比較浪費的閾值以盡量避免內(nèi)存問題。
對于作業(yè)規(guī)模不大的普通用戶而言,這些通常在可以接受的范圍之內(nèi),但對于上千并行度的大作業(yè)來說,浪費資源的總量會非常可觀,而且進程的不穩(wěn)定性導(dǎo)致的作業(yè)恢復(fù)時間也會比普通作業(yè)長得多,因此阿里巴巴的 Blink 團隊針對內(nèi)存管理機制做了大量的優(yōu)化,并于近期開始合并到 Flink。本文的內(nèi)容主要基于阿里團隊工程師宋辛童在 Flink Forward Beijing 的分享[2],以及后續(xù)相關(guān)的幾個 FLIP 提案。
Flink 目前(1.9)的內(nèi)存管理
TaskManager 作為 Master/Slave 架構(gòu)中的 Slave 提供了作業(yè)執(zhí)行需要的環(huán)境和資源,最為重要而且復(fù)雜,因此 Flink 的內(nèi)存管理也主要指 TaskManager 的內(nèi)存管理。
TaskManager 的資源(主要是內(nèi)存)分為三個層級,分別是最粗粒度的進程級(TaskManager 進程本身),線程級(TaskManager 的 slot)和 SubTask 級(多個 SubTask 共用一個 slot)。
圖1.TaskManager 資源層級
在進程級,TaskManager 將內(nèi)存劃分為以下幾塊:
- Heap Memory: 由 JVM 直接管理的 heap 內(nèi)存,留給用戶代碼以及沒有顯式內(nèi)存管理的 Flink 系統(tǒng)活動使用(比如 StateBackend、ResourceManager 的元數(shù)據(jù)管理等)。
- Network Memory: 用于網(wǎng)絡(luò)傳輸(比如 shuffle、broadcast)的內(nèi)存 Buffer 池,屬于 Direct Memory 并由 Flink 管理。
- Cutoff Memory: 在容器化環(huán)境下進程使用的物理內(nèi)存有上限,需要預(yù)留一部分內(nèi)存給 JVM 本身,比如線程棧內(nèi)存、class 等元數(shù)據(jù)內(nèi)存、GC 內(nèi)存等。
- Managed Memory: 由 Flink Memory Manager 直接管理的內(nèi)存,是數(shù)據(jù)在 Operator 內(nèi)部的物理表示。Managed Memory 可以被配置為 on-heap 或者 off-heap (direct memory)的,off-heap 的 Managed Memory 將有效減小 JVM heap 的大小并減輕 GC 負(fù)擔(dān)。目前 Managed Memory 只用于 Batch 類型的作業(yè),需要緩存數(shù)據(jù)的操作比如 hash join、sort 等都依賴于它。
根據(jù) Managed Memory 是 on-heap 或 off-heap 的不同,TaskManager 的進程內(nèi)存與 JVM 內(nèi)存分區(qū)關(guān)系分別如下:
圖2.TaskManager 內(nèi)存分區(qū)
在線程級別,TaskManager 會將其資源均分為若干個 slot (在 YARN/Mesos/K8s 環(huán)境通常是每個 TaskManager 只包含 1 個 slot),沒有 slot sharing 的情況下每個 slot 可以運行一個 SubTask 線程。除了 Managed Memory,屬于同一 TaskManager 的 slot 之間基本是沒有資源隔離的,包括 Heap Memory、Network Buffer、Cutoff Memory 都是共享的。所以目前 slot 主要的用處是限制一個 TaskManager 的 SubTask 數(shù)。
從作為資源提供者的 TaskManager 角度看, slot 是資源的最小單位,但從使用者 SubTask 的角度看,slot 的資源還可以被細(xì)分,因為 Flink 的 slot sharing 機制。默認(rèn)情況下, Flink 允許多個 SubTask 共用一個 slot 的資源,前提是這些 SubTask 屬于同一個 Job 的不同 Task。以官網(wǎng)的例子來說,一個拓?fù)錇?Source(6)-map(6)-keyby/window/apply(6)-sink(1) 的作業(yè),可以運行在 2 個 slot 數(shù)為 3 的 TaskManager 上(見圖3)。
圖3.TaskManager Slot Sharing
這樣的好處是,原本一共需要 19 個 slot 的作業(yè),現(xiàn)在只需要作業(yè)中與 Task 最大并行度相等的 slot, 即 6 個 slot 即可運行起來。此外因為不同 Task 通常有不同的資源需求,比如 source 主要使用網(wǎng)絡(luò) IO,而 map 可能主要需要 cpu,將不同 Task 的 subtask 放到同一 slot 中有利于資源的充分利用。
可以看到,目前 Flink 的內(nèi)存管理是比較粗粒度的,資源隔離并不是很完整,而且在不同部署模式下(Standalone/YARN/Mesos/K8s)或不同計算模式下(Streaming/Batch)的內(nèi)存分配也不太一致,為深度平臺化及大規(guī)模應(yīng)用增添了難度。
Flink 1.10 細(xì)粒度的資源管理
為了改進 Flink 內(nèi)存管理機制,阿里巴巴的工程師結(jié)合 Blink 的優(yōu)化經(jīng)驗分別就進程、線程、SubTask(Operator)三個層面分別提出了 3 個 FLIP,均以 1.10 為目標(biāo) release 版本。下面將逐一介紹每個提案的內(nèi)容。
FLIP-49: 統(tǒng)一 TaskExecutor 的內(nèi)存配置
■ 背景
TaskExecutor 在不同部署模式下具體負(fù)責(zé)作業(yè)執(zhí)行的進程,可以簡單視為 TaskManager。目前 TaskManager 的內(nèi)存配置存在不一致以及不夠直觀的問題,具體有以下幾點:
- 流批作業(yè)內(nèi)容配置不一致。Managed Memory 只覆蓋 DataSet API,而 DataStream API 的則主要使用 JVM 的 heap 內(nèi)存,相比前者需要更多的調(diào)優(yōu)參數(shù)且內(nèi)存消耗更難把控。
- RocksDB 占用的 native 內(nèi)存并不在內(nèi)存管理里,導(dǎo)致使用 RocksDB 時內(nèi)存需要很多手動調(diào)優(yōu)。
- 不同部署模式下,Flink 內(nèi)存計算算法不同,并且令人難以理解。
針對這些問題,FLIP-49[4] 提議通過將 Managed Memory 的用途拓展至 DataStream 以解決這個問題。DataStream 中主要占用內(nèi)存的是 StateBackend,它可以從管理 Managed Memory 的 MemoryManager 預(yù)留部分內(nèi)存或分配內(nèi)存。通過這種方式同一個 Flink 配置可以運行 Batch 作業(yè)和 Streaming 作業(yè),有利于流批統(tǒng)一。
■ 改進思路
可以看到目前 DataStream 作業(yè)的內(nèi)存分配沒有經(jīng)過 MemoryManager 而是直接向 JVM 申請,容易造成 heap OOM 或者物理內(nèi)存占用過大[3],因此直接的修復(fù)辦法是讓 MemoryManager 了解到 StateBackend 的內(nèi)存占用。這會有兩種方式,一是直接通過 MemoryManager 申請內(nèi)存,二是仍使用隱式分配的辦法,但需要通知 MemoryManager 預(yù)留這部分內(nèi)存。此外 MemoryManager 申請 off-heap 的方式也會有所變化,從 ByteBuffer#allocateDirect() 變?yōu)?Unsafe#allocateMemory(),這樣的好處是顯式管理的 off-heap 內(nèi)存可以從 JVM 的 -XX:MaxDirectMemorySize 參數(shù)限制中分離出來。
另外 MemoryManager 將不只可以被配置為 heap/off-heap,而是分別擁有對應(yīng)的內(nèi)存池。這樣的好處是在同一個集群可以運行要求不同類型內(nèi)存的作業(yè),比如一個 FsStateBackend 的 DataStream 作業(yè)和一個 RocksDBStateBackend 的 DataStream 作業(yè)。heap/off-heap 的比例可以通過參數(shù)配置,1/0 則代表了完全的 on-heap 或者 off-heap。
改進之后 TaskManager 的各內(nèi)存分區(qū)如下:
TaskManager 新內(nèi)存結(jié)構(gòu)
值得注意的是有 3 個分區(qū)是沒有默認(rèn)值的,包括 Framework Heap Memory、Total Flink Memory 和 Total Process Memory,它們是決定總內(nèi)存的最關(guān)鍵參數(shù),三者分別滿足不同部署模式的需要。比如在 Standalone 默認(rèn)下,用戶可以配置 Framework Heap Memory 來限制用戶代碼使用的 heap 內(nèi)存;而在 YARN 部署模式下,用戶可以通過配置 YARN container 的資源來間接設(shè)置 Total Process Memory。
FLIP-56: 動態(tài) slot 分配
■ 背景
目前 Flink 的資源是預(yù)先靜態(tài)分配的,也就是說 TaskManager 進程啟動后 slot 的數(shù)目和每個 slot 的資源數(shù)都是固定的而且不能改變,這些 slot 的生命周期和 TaskManager 是相同的。Flink Job 后續(xù)只能向 TaskManager 申請和釋放這些 slot,而沒有對 slot 資源數(shù)的話語權(quán)。
圖5. 靜態(tài) slot 分配
這種粗粒度的資源分配假定每個 SubTask 的資源需求都是大致相等的,優(yōu)點是較為簡單易用,缺點在于如果出現(xiàn) SubTask 的資源需求有傾斜的情況,用戶則需要按其中某個 SubTask 最大資源來配置總體資源,導(dǎo)致資源浪費且不利于多個作業(yè)復(fù)用相同 Flink 集群。
■ 改進思路
FLIP-56[5] 提議通過將 TaskManager 的資源改為動態(tài)申請來解決這個問題。TaskManager 啟動的時候只需要確定資源池大小,然后在有具體的 Flink Job 申請資源時再按需動態(tài)分配 slot。Flink Job 申請 slot 時需要附上資源需求,TaskManager 會根據(jù)該需求來確定 slot 資源。
圖6. 動態(tài) slot 分配
值得注意的是,slot 資源需求可以是 unknown。提案引入了一個新的默認(rèn) slot 資源要求配置項,它表示一個 slot 占總資源的比例。如果 slot 資源未知,TaskManager 將按照該比例切分出 slot 資源。為了保持和現(xiàn)有靜態(tài) slot 模型的兼容性,如果該配置項沒有被配置,TaskManager 會根據(jù) slot 數(shù)目均等分資源生成 slot。
目前而言,該 FLIP 主要涉及到 Managed Memory 資源,TaskManager 的其他資源比如 JVM heap 還是多個 slot 共享的。
FLIP-53: 細(xì)粒度的算子資源管理
■ 背景
FLIP-56 使得 slot 的資源可以根據(jù)實際需求確定,而 FLIP-53 則探討了 Operator (算子)層面如何表達(dá)資源需求,以及如何根據(jù)不同 Operator 的設(shè)置來計算出總的 slot 資源。
目前 DataSet API 以及有可以指定 Operator 資源占比的方法(TaskConfig 和 ChainedDriver),因此這個 FLIP 只涉及到 DataStream API 和 Table/SQL API (先在 Blink Planner 實現(xiàn))。不過提案并沒有包括用戶函數(shù) API 上的變化(類似新增 dataStream.setResourceSpec() 函數(shù)),而是主要討論 DataStream 到 StreamGraph 的翻譯過程如何計算 slot 資源。改進完成后,這三個 API 的資源計算邏輯在底層會是統(tǒng)一的。
■ 改進思路
要理解 Flink 內(nèi)部如何劃分資源,首先要對 Flink 如何編譯用戶代碼并部署到分布式環(huán)境的過程有一定的了解。
圖7. Flink 作業(yè)編譯部署流程
以 DataStream API 為例,用戶為 DataStream 新增 Operator 時,Flink 在底層會將以一個對應(yīng)的 Transform 來封裝。比如 dataStream.map(new MyMapFunc()) 會新增一個 OneInputTransformation 實例,里面包括了序列化的 MyMapFunc 實例,以及 Operator 的配置(包括名稱、uid、并行度和資源等),并且記錄了它在拓?fù)渲械那耙粋€ Transformation 作為它的數(shù)據(jù)輸入。
當(dāng) env.execute() 被調(diào)用時,在 client 端 StreamGraphGenerator 首先會遍歷 Transformation 列表構(gòu)造出 StreamGraph 對象(每個 Operator 對應(yīng)一個 StreamNode),然后 StreamingJobGraphGenerator 再將 StreamGraph 翻譯成 DataStream/DataSet/Table/SQL 通用的 JobGraph(此時會應(yīng)用 chaining policy 將可以合并的 Operator 合并為 OperatorChain,每個 OperatorChain 或不能合并的 Operator 對應(yīng)一個 JobVertex),并將其傳給 JobManager。
JobManager 收到 JobGraph 后首先會將其翻譯成表示運行狀態(tài)的 ExecutionGraph,ExecutionGraph 的每個節(jié)點稱為 ExecutionJobVertex,對應(yīng)一個 JobVertex。ExecutionJobVertex 有一個或多個并行度且可能被調(diào)度和執(zhí)行多次,其中一個并行度的一次執(zhí)行稱為 Execution,JobManager 的 Scheduler 會為每個 Execution 分配 slot。
細(xì)粒度的算子資源管理將以下面的方式作用于目前的流程:
值得注意的是,Scheduler 的調(diào)度有分 EAGER 模式和 LAZY_FROM_SOURCE 兩種模式,分別用于 Stream 作業(yè)和 Batch 作業(yè),它們會影響到 slot 的資源計算。Stream 類型的作業(yè)要求所有的 Operator 同時運行,因此資源的需求是急切的(EAGER);而 Batch 類型的作業(yè)可以劃分為多個階段,不同階段的 Operator 不需要同時運行,可以等輸入數(shù)據(jù)準(zhǔn)備好了再分配資源(LAZY_FROM_SOURCE)。這樣的差異導(dǎo)致如果要充分利用 slot,Batch 作業(yè)需要區(qū)分不同階段的 Task,同一時間只考慮一個階段的 Task 資源。
解決的方案是將 slot sharing 的機制拓展至 Batch 作業(yè)。默認(rèn)情況下 Stream 作業(yè)的所有 Operator 都屬于 default sharing group,所以全部 Operator 都能共用都一個 slot。對于 Batch 作業(yè)而言,我們將整個 JobGraph 根據(jù) suffle 劃分為一至多個 Region,每個 Region 屬于獨立的 sharing group,因而不會被放到同一個 slot 里面。
圖8. 不同作業(yè)類型的 Slot Sharing Group
總結(jié)
隨著 Flink 的越來越大規(guī)模地被應(yīng)用于各種業(yè)務(wù),目前資源管理機制的靈活性、易用性不足的問題越發(fā)凸顯,新的細(xì)粒度資源管理機制將大大緩解這個問題。此外,新資源管理機制將統(tǒng)一流批兩者在 runtime 層資源管理,這也為將最終的流批統(tǒng)一打下基礎(chǔ)。對于普通用戶而言,這里的大多數(shù)變動是透明的,主要的影響應(yīng)該是出現(xiàn)新的內(nèi)存相關(guān)的配置項需要了解一下。
參考資料:
1.[[FLINK-13477] Containerized TaskManager killed because of lack of memory overhead](https://issues.apache.org/jira/browse/FLINK-13477)
2.機遇與挑戰(zhàn):Apache Flink 資源管理機制解讀與展望
3.[[FLINK-7289] Memory allocation of RocksDB can be problematic in container environments](https://issues.apache.org/jira/browse/FLINK-7289)
4.FLIP-49: Unified Memory Configuration for TaskExecutors
5.FLIP-56: Dynamic Slot Allocation
6.FLIP-53: Fine Grained Operator Resource Management
原文鏈接
本文為云棲社區(qū)原創(chuàng)內(nèi)容,未經(jīng)允許不得轉(zhuǎn)載。
總結(jié)
以上是生活随笔為你收集整理的Flink 1.10 细粒度资源管理解析的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 上去很美的 Serverless 在中国
- 下一篇: 十年沉淀,阿里云发布全球领先的对象存储O