Flink 中的木桶效应:单个 subtask 卡死导致整个任务卡死
Flink 從入門到精通?系列文章
工作或者面試中一般都要求面試者有較強的獨立解決問題的能力,解決問題的前提是:我們對相應組件的原理非常清楚。本文先講述原理,再結(jié)合實戰(zhàn)分析一個線上任務的異常案例。
本文分以下幾個部分:
第一部分直接給出結(jié)論
第二部分會分析原理:Flink 中單個 subtask 卡死,為什么會導致整個任務卡死?
第三部分:線上業(yè)務如果出現(xiàn)類似問題如何定位?換言之,線上出現(xiàn)哪些現(xiàn)象可以說明是單個 subtask 導致整個任務卡住了。會通過案例結(jié)合 Metric jstack 等定位問題的根本原因。
第四部分對 Flink 現(xiàn)有物理分區(qū)策略的思考
第五部分總結(jié)
1、結(jié)論
keyBy 或 rebalance 下游的算子,如果單個 subtask 完全卡死,會把整個 Flink 任務卡死
通過反壓可以確定哪個 Task 出現(xiàn)性能瓶頸
通過 inPoolUsage 指標可以確定下游 Task 的哪個 Subtask 出現(xiàn)性能瓶頸
Flink 現(xiàn)有的物理分區(qū)策略全是靜態(tài)的負載均衡策略,沒有動態(tài)根據(jù)負載能力進行負載均衡的策略
2、原理分析
2.1 分析一個簡單的 Flink 任務
如下圖所示,任務由 Source → map → KeyBy → Sink 四個算子組成。其中 keyBy 和 Sink 算子之間存在 shuffle,圖中相同顏色的箭頭表示到達 Sink 中相同的 subtask。其中 subtask A0 和 A1 都要給 TaskB 的 3 個 subtask 發(fā)送數(shù)據(jù)。
任務執(zhí)行圖2.2 任務運行過程中,具體的數(shù)據(jù)傳輸過程
如下圖所示,上游每個 Subtask 中會有 3 個 resultSubPartition,連接下游算子的 3 個 subtask。下游每個 subtask 會有 2 個 InputChannel,連接上游算子的 2 個 subtask。在正常運行過程中如果沒有反壓,所有的 buffer pool 是用不完的。就像下圖一樣,所有的 InputChannel 并沒有占滿,公共的 buffer pool 中也幾乎沒有數(shù)據(jù)。
正常的數(shù)據(jù)傳輸2.3 Subtask B0 卡死后數(shù)據(jù)傳輸發(fā)生的現(xiàn)象
假設由于某些原因 Subtask B0 長時間地處理非常慢甚至卡死,其他的 Subtask 都正常,會出現(xiàn)下圖中的現(xiàn)象。
下游其中一個 subtask 反壓嚴重2.3.1 現(xiàn)象描述
1、Subtask B0 內(nèi)的 A0 和 A1 兩個 InputChannel 會被占滿
2、Subtask B0 公共的 BufferPool 中可申請到的空間也被占滿
3、Subtask A0 和 A1 的 B0 ResultSubPartition 被占滿
4、Subtask A0 和 A1 公共的 BufferPool 中可申請到的空間也被占滿
5、Subtask B1 和 B2 的所有 InputChannel 和 BufferPool 都是空的
6、Subtask A0 和 A1 的 B1、B2 ResultSubPartition 都是空的
2.3.2 現(xiàn)象解釋 ☆☆☆☆☆
Subtask B0 卡死了,不再處理數(shù)據(jù)或者處理的超級慢。上游如果一直給 Subtask B0 發(fā)送數(shù)據(jù),必然會導致 Subtask B0 的所有 InputChannel 占滿,最后導致公共的 BufferPool 中可申請到的空間也被占滿。也就是現(xiàn)象中的 1、2 兩點。
雖然 Subtask B0 的所有 Buffer 占滿后,Subtask A0 和 A1 仍然在生產(chǎn)數(shù)據(jù),此時必然不能發(fā)送數(shù)據(jù)到 B0,所以就會把 Subtask A0 和 A1 中 Subtask B0 對應的 buffer 給占滿(也就是 Flink 中反壓傳遞的過程),最后再把 Subtask A0 和 A1 公共的 BufferPool 中可申請到的空間也占滿。也就是現(xiàn)象中的 3、4 兩點。
其中 1、2、3、4 這四點比較容易理解,關(guān)鍵是 5、6 兩點,即:Subtask B0 卡死會什么導致 Subtask B1 和 B2 完全沒有數(shù)據(jù)了?
Subtask B1 和 B2 在整個上下游的 buffer 都是空的,理論來講只要有空余的 buffer,就可以用來傳輸數(shù)據(jù)。但實際上并沒有將 Subtask A0 和 A1 的數(shù)據(jù)傳輸給 Subtask B1 和 B2。
「這里的根本原因是:Subtask A0 和 A1 的主線程完全卡死壓根不會生產(chǎn)數(shù)據(jù)了。」
既然不會生產(chǎn)數(shù)據(jù)了,那么 Subtask A0 和 A1 的下游就算 buffer 空著,也是沒有意義的。所以就出現(xiàn) 5、6 的現(xiàn)象。
重點解釋:為什么 Subtask A0 和 A1 的主線程會卡死?A0 和 A1 是一樣的,下面單獨分析 A0。Subtask A0 處理數(shù)據(jù)流程圖如下所示:
Subtask A0 處理數(shù)據(jù)流程Subtask A0 的主線程會從上游讀取數(shù)據(jù)消費,按照數(shù)據(jù)的 KeyBy 規(guī)則,將數(shù)據(jù)發(fā)送到 B0、B1、B2 三個 outputBuffer 中。現(xiàn)在我們可以看到 B0 對應的 buffer 占滿了,且 B0 在公共的 BufferPool 中可申請到的空間也被占滿。現(xiàn)在主線程在處理數(shù)據(jù),假設這條數(shù)據(jù)根據(jù) KeyBy 分區(qū)規(guī)則后,應該分配給 Subtask B0 處理,那么主線程必須把數(shù)據(jù)放到 B0 這個 buffer 中。但是現(xiàn)在 buffer 沒有空間了,所以主線程就會卡在申請 buffer 上,直到可以再申請到 buffer(這也是 Flink 反壓的實現(xiàn)原理)。
同理 Subtask A1 也會出現(xiàn)這樣的問題,如果 Task A 的并行度是 1000,那么 Subtask B0 也會將上游 1000 個 Subtask A 全部卡住。最后導致整個任務全部卡住。
原理弄懂了,下一階段要搞懂線上出現(xiàn)哪些現(xiàn)象可以說明是單個 subtask 導致整個任務卡住了。線上業(yè)務如果出現(xiàn)類似問題如何定位?
2.4 小結(jié)
其實不只是 keyBy 場景會出現(xiàn)上述問題,rebalance 場景也會出現(xiàn)上述問題。rebalance 分區(qū)策略表示,上游 subtask 以輪詢的策略向下游所有 subtask 發(fā)送數(shù)據(jù),即:subtask A0 會先給 subtask B0 發(fā)一條,下一條發(fā)給 B1,下一條再發(fā)給 B2,再發(fā)給 B0 依次類推:B0、B1、B2、B0、B1、B2、B0、B1、B2。。。
一旦 B0 卡死,最終主線程肯定因為 B0 把 Subtask A 內(nèi)的 buffer 用完了,導致主線程卡住。
「所以總結(jié)成一句話就是:keyBy 或 rebalance 下游的算子,如果單個 subtask 完全卡死,會把整個 Flink 任務卡死。」
3、問題定位過程
3.1 業(yè)務場景
業(yè)務反饋一個寫 ES 的任務跑一會就沒輸出了,完全卡死,一條輸出都沒有。80 并發(fā)完全正常,可以正常輸出,調(diào)大并發(fā)到 100 以后,運行一會就沒有輸出了。
DAG 圖如下所示,上下游算子之間的數(shù)據(jù)分區(qū)策略是 rebalance。
任務 DAG3.2 思考及定位過程
聽到業(yè)務方的反饋,看到作業(yè) DAG 圖,開始定位問題,筆者并沒有想到第二部分那么多的原理分析,因為大部分的任務卡住并不是因為單個 Subtask 卡住導致整個任務卡住。所以下面的定位過程完全是以一個旁觀者的角度觸發(fā),也是筆者當時定位問題的一個完整過程。筆者作為平臺方,也是完全不清楚業(yè)務邏輯的,只是從 Flink 的角度來定位問題。
3.2.1 從 DAG 上來看任務有兩個 Task,到底是上游 Task 有問題還是下游 Task 有問題
如何定位上游 Task 還是下游有問題很簡單:看一下上游 Task 是否有反壓,如果下游 Task 卡死或者消費慢,上游 Task 肯定反壓比較嚴重。所以判定依據(jù):
如果上游 Task 反壓嚴重,則表示下游 Task 有問題
如果上游 Task 沒有反壓,大概率是上游 Task 有問題
查看后,發(fā)現(xiàn)上游 Task 的所有 Subtask 反壓都非常嚴重,所以斷定下游 Task 有問題。
3.2.2 下游 Task 發(fā)生了什么?在干嘛?
要想知道下游 Task (ES Sink)在干嘛,很簡單:查看現(xiàn)場,隨便選一個 Subtask 打個 jstack,看看當前進程在做什么。
下游 Task 總共 100 個并行度,隨便找了一個 Subtask 打 jstack,發(fā)現(xiàn)當前 Subtask 處理數(shù)據(jù)的主線程卡在 poll 數(shù)據(jù)。即:ES Sink 的當前 Subtask 不輸出數(shù)據(jù)竟然是因為上游不發(fā)送數(shù)據(jù)了。為了確認當前 Subtask 接收不到上游算子發(fā)送的數(shù)據(jù),又看了當前 Subtask 的 Metric:inPoolUsage。inPoolUsage 表示當前算子輸入緩沖區(qū)的使用率,inPoolUsage 持續(xù)為 0 證實了當前 Subtask 確實接收不到上游發(fā)送的數(shù)據(jù)。
讀者在這里是不是開始懷疑了,是不是上游出問題導致整個任務的下游都接收不到數(shù)據(jù)?
答:不可能。如果上游 Task 出問題,所有下游 Subtask 都是正常的,都在接收上游發(fā)送數(shù)據(jù),那么上游算子的 buffer 肯定是空的,怎么可能出現(xiàn)反壓。所以上游算子反壓嚴重必然是下游算子處理性能不行。
到這里,經(jīng)過上述一步步推導,才開始想本文第二部分那些原理分析:是不是下游 Task 有某幾個 Subtask 卡住了,導致整個任務卡住了。問題來了:怎么找出下游 Task 那幾個可能卡住的 Subtask?
3.2.3 怎么找出下游 Task 那幾個可能卡住的 Subtask?
本文第二部分分析過,如果下游 Task 某幾個 Subtask 卡死,那么這幾個 Subtask 的 inputBuffer 會被占滿,且其他的 Subtask inputBuffer 全為空。所以我們只需要找出下游哪幾個 Subtask 的 inputBuffer 占滿了,也就是出現(xiàn)卡頓的 Subtask。
此時需要 Flink 強大的 Metric,Flink 的 Metric 可以看到下游 Task 所有 Subtask 的 inPoolUsage,Flink 的 Web UI 可以看到 Metric 項。下游 Task 有 100 個并行度,即:對應 100 個 Subtask。最笨的方式是分別查看 100 個 Subtask 的 inPoolUsage 指標。
高效的方式是構(gòu)建好整個 Flink 的 Metric 系統(tǒng),通過 Metric Report 將各種指標收集到外部存儲系統(tǒng),用 Grafana 或其他可視化工具展示。此時根據(jù) Top N 的方式去查詢即可。按照 subtask_id 分組,對 inPoolUsage 排降序,找出 inPoolUsage 最高的幾個 subtask_id。
利用上述方案發(fā)現(xiàn)只有一個 subtask 的 inPoolUsage 為 1,其余的 subtask 的 inPoolUsage 都為 0。此時就可以得出結(jié)論了:確實是因為單個 Subtask 導致整個任務卡住了。
3.2.4 解決方案
定位到具體的 Subtask,jstack 發(fā)現(xiàn)該 Subtask 的主線程卡在了 ES Sink 的某一處代碼。具體 ES 的問題這里不分析了,是 ES 客戶端的 bug 導致卡死。查看了 ES 的相關(guān) issue 將代碼合入后問題最終解決。
3.2.5 小結(jié)
第三部分主要是從 Flink 的角度出發(fā),使用一種通用的方法論來定位到任務被卡死的真正原因。定位問題需要有全面的理論支撐結(jié)合強大的 Metric 系統(tǒng)輔助定位問題。
4、 關(guān)于 Flink 物理分區(qū)策略的思考
Flink 支持的物理分區(qū)策略
Flink 的物理分區(qū)策略支持多種,包括:partitionCustom、shuffle、rebalance、rescale、broadcast。具體參考:https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/operators/#physical-partitioning。
其實前四種分區(qū)策略都可以認為是一種負載均衡策略,上游算子 n 個并行度,下游算子 m 個并行度,如何將上游 n 個 Subtask 的數(shù)據(jù)打散到下游 m 個 Subtask 呢?
partitionCustom 表示自定義分區(qū)策略,根據(jù)用戶自定義的分區(qū)策略發(fā)送數(shù)據(jù)
shuffle 表示隨機的策略實現(xiàn)負載均衡。
rebalance 表示輪詢策略。
rescale 是對 rebalance 策略的優(yōu)化。引用官網(wǎng) rescale 圖示,相比 rebalance 而言,使用 rescale 策略時,上游 Subtask 只會給下游某幾個 Subtask 發(fā)送數(shù)據(jù)。大大減少數(shù)據(jù)傳輸時邊的個數(shù)。
如下圖所示,Source 有兩個 Subtask,Map 有 6 個 Subtask,則一個 Source 的 Subtask 固定給 3 個 Map Subtask 發(fā)送數(shù)據(jù)。
如果是 rebalance,每個 Source 都會給所有的 Map Subtask 發(fā)送數(shù)據(jù)。
Flink 欠缺的一種負載均衡策略
上述幾種物理分區(qū)策略都是靜態(tài)的,而不是動態(tài)的。如下圖所示是 rebalance shuffle 圖示,上游 Task A 的所有 Subtask 要發(fā)送數(shù)據(jù)給下游 Task B 的所有 Subtask。假設 Subtask B0 沒有卡死,但是由于資源競爭等原因,Subtask B0 的吞吐比 B1 和 B2 要差。但是 rebalance 是嚴格的輪詢策略,所以上游給 Subtask B0、B1、B2 發(fā)送的數(shù)據(jù)量完全一致。最后 B0 就會拖慢整個任務的吞吐量,B1 和 B2 也不能發(fā)揮出自己真正的性能。
rebalance shuffle對于這種問題,常用的負載均衡策略并不是使用隨機或者輪詢策略,而是上游發(fā)送數(shù)據(jù)時會檢測下游的負載能力,根據(jù)不同的負載能力,給下游發(fā)送不同的數(shù)據(jù)量。假設下游 Subtask B1 和 B2 吞吐量高于 B0,那么上游 Subtask A 會多給 B1 和 B2 發(fā)送一些數(shù)據(jù),少給 B0 發(fā)送一些數(shù)據(jù)。
該策略可以解決 rebalance 策略導致的木桶效應。但該策略不能解決 KeyBy 的場景,因為 KeyBy 策略嚴格決定了每條數(shù)據(jù)要發(fā)送到下游哪個 Subtask。
5、總結(jié)
再次回顧第一部分的結(jié)論:
keyBy 或 rebalance 下游的算子,如果單個 subtask 完全卡死,會把整個 Flink 任務卡死
通過反壓可以確定哪個 Task 出現(xiàn)性能瓶頸
通過 inPoolUsage 指標可以確定下游 Task 的哪個 Subtask 出現(xiàn)性能瓶頸
Flink 現(xiàn)有的物理分區(qū)策略全是靜態(tài)的負載均衡策略,沒有動態(tài)根據(jù)負載能力進行負載均衡的策略
總結(jié)
以上是生活随笔為你收集整理的Flink 中的木桶效应:单个 subtask 卡死导致整个任务卡死的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 技术项目 - Linux网卡中断使单个C
- 下一篇: 51 nod 1431 快乐排队