Flink State 最佳实践
本文主要分享與交流 Flink 狀態(tài)使用過程中的一些經(jīng)驗與心得,當然標題取了“最佳實踐”之名,希望文章內(nèi)容能給讀者帶去一些干貨。本文內(nèi)容首先是回顧 state 相關(guān)概念,并認識和區(qū)別不同的 state backend;之后將分別對 state 使用訪問以及 checkpoint 容錯相關(guān)內(nèi)容進行詳細講解,分享一些經(jīng)驗和心得。
State 概念回顧
我們先回顧一下到底什么是 state,流式計算的數(shù)據(jù)往往是轉(zhuǎn)瞬即逝, 當然,真實業(yè)務場景不可能說所有的數(shù)據(jù)都是進來之后就走掉,沒有任何東西留下來,那么留下來的東西其實就是稱之為 state,中文可以翻譯成狀態(tài)。
在下面這個圖中,我們的所有的原始數(shù)據(jù)進入用戶代碼之后再輸出到下游,如果中間涉及到 state 的讀寫,這些狀態(tài)會存儲在本地的 state backend(可以對標成嵌入式本地 kv 存儲)當中。
接下來我們會在四個維度來區(qū)分兩種不同的 state:operator state 以及 keyed state。
1. 是否存在當前處理的 key(current key):operator state 是沒有當前 key 的概念,而 keyed state 的數(shù)值總是與一個 current key 對應。
2. 存儲對象是否 on heap: 目前 operator state backend 僅有一種 on-heap 的實現(xiàn);而 keyed state backend 有 on-heap 和 off-heap(RocksDB)的多種實現(xiàn)。
3. 是否需要手動聲明快照(snapshot)和恢復 (restore) 方法:operator state 需要手動實現(xiàn) snapshot 和 restore 方法;而 keyed state 則由 backend 自行實現(xiàn),對用戶透明。
4. 數(shù)據(jù)大小:一般而言,我們認為 operator state 的數(shù)據(jù)規(guī)模是比較小的;認為 keyed state 規(guī)模是相對比較大的。需要注意的是,這是一個經(jīng)驗判斷,不是一個絕對的判斷區(qū)分標準。
StateBackend 的分類
下面這張圖對目前廣泛使用的三類 state backend 做了區(qū)分,其中綠色表示所創(chuàng)建的operator/keyed state backend 是 on-heap 的,黃色則表示是 off-heap 的。
一般而言,在生產(chǎn)中,我們會在 FsStateBackend 和 RocksDBStateBackend 間選擇:
- FsStateBackend:性能更好;日常存儲是在堆內(nèi)存中,面臨著 OOM 的風險,不支持增量 checkpoint。
- RocksDBStateBackend:無需擔心 OOM 風險,是大部分時候的選擇。
RocksDB StateBackend 概覽和相關(guān)配置討論
RocksDB 是 Facebook 開源的 LSM 的鍵值存儲數(shù)據(jù)庫,被廣泛應用于大數(shù)據(jù)系統(tǒng)的單機組件中。Flink 的 keyed state 本質(zhì)上來說就是一個鍵值對,所以與 RocksDB 的數(shù)據(jù)模型是吻合的。下圖分別是 “window state” 和 “value state” 在 RocksDB 中的存儲格式,所有存儲的 key,value 均被序列化成 bytes 進行存儲。
在 RocksDB 中,每個 state 獨享一個 Column Family,而每個 Column family 使用各自獨享的 write buffer 和 block cache,上圖中的 window state 和 value state實際上分屬不同的 column family。
下面介紹一些對 RocksDB 性能比較有影響的參數(shù),并整理了一些相關(guān)的推薦配置,至于其他配置項,可以參閱社區(qū)相關(guān)文檔。
| state.backend.rocksdb.thread.num | 后臺 flush 和 compaction 的線程數(shù). 默認值 ‘1‘. 建議調(diào)大 |
| state.backend.rocksdb.writebuffer.count | 每個 column family 的 write buffer 數(shù)目,默認值 ‘2‘. 如果有需要可以適當調(diào)大 |
| state.backend.rocksdb.writebuffer.size | 每個 write buffer 的 size,默認值‘64MB‘. 對于寫頻繁的場景,建議調(diào)大 |
| state.backend.rocksdb.block.cache-size | 每個 column family 的 block cache大小,默認值‘8MB’,如果存在重復讀的場景,建議調(diào)大 |
State best practice:一些使用 state 的心得
Operator state 使用建議
■ 慎重使用長 list
下圖展示的是目前 task 端 operator state 在執(zhí)行完 checkpoint 返回給 job master 端的 StateMetaInfo 的代碼片段。
由于 operator state 沒有 key group 的概念,所以為了實現(xiàn)改并發(fā)恢復的功能,需要對 operator state 中的每一個序列化后的元素存儲一個位置偏移 offset,也就是構(gòu)成了上圖紅框中的 offset 數(shù)組。
那么如果你的 operator state 中的 list 長度達到一定規(guī)模時,這個 offset 數(shù)組就可能會有幾十 MB 的規(guī)模,關(guān)鍵這個數(shù)組是會返回給 job master,當 operator 的并發(fā)數(shù)目很大時,很容易觸發(fā) job master 的內(nèi)存超用問題。我們遇到過用戶把 operator state 當做黑名單存儲,結(jié)果這個黑名單規(guī)模很大,導致一旦開始執(zhí)行 checkpoint,job master 就會因為收到 task 發(fā)來的“巨大”的 offset 數(shù)組,而內(nèi)存不斷增長直到超用無法正常響應。
■ 正確使用 UnionListState
union list state 目前被廣泛使用在 kafka connector 中,不過可能用戶日常開發(fā)中較少遇到,他的語義是從檢查點恢復之后每個并發(fā) task 內(nèi)拿到的是原先所有operator 上的 state,如下圖所示:
kafka connector 使用該功能,為的是從檢查點恢復時,可以拿到之前的全局信息,如果用戶需要使用該功能,需要切記恢復的 task 只取其中的一部分進行處理和用于下一次 snapshot,否則有可能隨著作業(yè)不斷的重啟而導致 state 規(guī)模不斷增長。
Keyed state 使用建議
■ 如何正確清空當前的 state
state.clear() 實際上只能清理當前 key 對應的 value 值,如果想要清空整個 state,需要借助于 applyToAllKeys 方法,具體代碼片段如下:
如果你的需求中只是對 state 有過期需求,借助于 state TTL 功能來清理會是一個性能更好的方案。
■ RocksDB 中考慮 value 值很大的極限場景
受限于 JNI bridge API 的限制,單個 value 只支持 2^31 bytes 大小,如果存在很極限的情況,可以考慮使用 MapState 來替代 ListState 或者 ValueState,因為RocksDB 的 map state 并不是將整個 map 作為 value 進行存儲,而是將 map 中的一個條目作為鍵值對進行存儲。
■ 如何知道當前 RocksDB 的運行情況
比較直觀的方式是打開 RocksDB 的 native metrics ,在默認使用 Flink managed memory 方式的情況下,state.backend.rocksdb.metrics.block-cache-usage ,state.backend.rocksdb.metrics.mem-table-flush-pending,state.backend.rocksdb.metrics.num-running-compactions 以及 state.backend.rocksdb.metrics.num-running-flushes 是比較重要的相關(guān) metrics。
下面這張圖是 Flink-1.10 之后,打開相關(guān) metrics 的示例圖:
而下面這張是 Flink-1.10 之前或者關(guān)閉 state.backend.rocksdb.memory.managed 的效果:
■ 容器內(nèi)運行的 RocksDB 的內(nèi)存超用問題
在 Flink-1.10 之前,由于一個 state 獨占若干 write buffer 和一塊 block cache,所以我們會建議用戶不要在一個 operator 內(nèi)創(chuàng)建過多的 state,否則需要考慮到相應的額外內(nèi)存使用量,否則容易造成在容器內(nèi)運行時,相關(guān)進程被容器環(huán)境所殺。對于用戶來說,需要考慮一個 slot 內(nèi)有多少 RocksDB 實例在運行,一個 RocksDB 中有多少 state,整體的計算規(guī)則就很復雜,很難真得落地實施。
Flink-1.10 之后,由于引入了 RocksDB 的內(nèi)存托管機制,在絕大部分情況下, RocksDB 的這一部分 native 內(nèi)存是可控的,不過受限于 RocksDB 的相關(guān) cache 實現(xiàn)限制(這里暫不展開,后續(xù)會有文章討論),在某些場景下,無法做到完美控制,這時候建議打開上文提到的 native metrics,觀察相關(guān) block cache 內(nèi)存使用是否存在超用情況,可以將相關(guān)內(nèi)存添加到 taskmanager.memory.task.off-heap.size 中,使得 Flink 有更多的空間給 native 內(nèi)存使用。
一些使用 checkpoint 的使用建議
■ Checkpoint 間隔不要太短
雖然理論上 Flink 支持很短的 checkpoint 間隔,但是在實際生產(chǎn)中,過短的間隔對于底層分布式文件系統(tǒng)而言,會帶來很大的壓力。另一方面,由于檢查點的語義,所以實際上 Flink 作業(yè)處理 record 與執(zhí)行 checkpoint 存在互斥鎖,過于頻繁的 checkpoint,可能會影響整體的性能。當然,這個建議的出發(fā)點是底層分布式文件系統(tǒng)的壓力考慮。
■ 合理設(shè)置超時時間
默認的超時時間是 10min,如果 state 規(guī)模大,則需要合理配置。最壞情況是分布式地創(chuàng)建速度大于單點(job master 端)的刪除速度,導致整體存儲集群可用空間壓力較大。建議當檢查點頻繁因為超時而失敗時,增大超時時間。
原文鏈接
本文為云棲社區(qū)原創(chuàng)內(nèi)容,未經(jīng)允許不得轉(zhuǎn)載。
總結(jié)
以上是生活随笔為你收集整理的Flink State 最佳实践的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Gartner 容器报告:阿里云与 AW
- 下一篇: 基于X-Engine引擎的实时历史数据库