Flink Checkpoint 机制:如何保证 barrier 和数据之间不乱序?
Flink Checkpoint 機制:如何保證 barrier 和數據之間不亂序?
1 前言
1.1 什么是 state?
要說 checkpoint,首先要從 state 聊起。之前有被問到對于 Flink state 的理解,state 的字面含義就是狀態。所謂狀態,它本身不難理解,簡單的說,state 就是你在處理事件的時候需要保存的狀態信息。
舉個例子,如果你要計時,就要保存開始時間,然后用結束時間減去開始時間,這里的“開始時間”就是先前的狀態。
Flink 官方對 state 也有準確的解釋:
State in Streaming Applications
Simply put, state is the information that you need to remember across events. Even the most trivial streaming applications are typically stateful because of their need to “remember” the exact position they are processing data from, for example in the form of a Kafka Partition Offset or a File Offset. In addition, many applications hold state internally as a way to support their internal operations, such as windows, aggregations, joins, or state machines.
1.2 為什么要有 checkpoint?
理解了 state 之后,checkpoint 是什么呢?
Flink 作為一個運行在成百上千臺機器上的分布式計算引擎,當初被設計的初衷,就是人們想辦法利用一大票便宜的PC機,通過一頓猛如虎的數學操作,來自己構建一個宏觀上更強性能、更高計算能力的計算機,去替換掉昂貴的小型機、大型機。
正如眾多分布式系統的共同點那樣,組件失效 / 機器故障應該被視為常態,而不是意外事件。成百上千的廉價機器構成的節點相互訪問、交換數據,無論從數量還是質量上,都很難保證時刻的正常運轉,所以需要一種機制,讓它們可以自動從失敗狀態恢復過來。因此,持續備份,容錯,以及自動恢復這些特性,必須集成到這個計算引擎當中。
實際上,早在 MapReduce 模型被提出的時候,就已經設計了它自己的容錯機制,后來隨著數據特點的變化(批/流),以及理論和技術的不斷發展,到了 Flink 時期,它在現有容錯機制的基礎上進行改進,提出了 checkpoint 機制以及一些針對不同場景的優化版本(取決于你選擇 at least once 還是 exactly once 語義)。
說人話就是,通過在不同時點以快照的形式保存當前的狀態,來方便故障時的快速恢復,避免了全量的重新計算帶來的巨大成本。
在 Apache Flink?: Stream and Batch Processing in a Single Engine 中,有提到過它 checkpoint 的實現:
3.3 Fault Tolerance
The mechanism used in Flink is called Asynchronous Barrier Snapshotting (ABS [7]). Barriers are control records injected into the input streams that correspond to a logical time and logically separate the stream to the part whose effects will be included in the current snapshot and the part that will be snapshotted later. An operator receives barriers from upstream and first performs an alignment phase, making sure that the barriers from all inputs have been received. Then, the operator writes its state (e.g., contents of a sliding window, or custom data structures) to durable storage (e.g., the storage backend can be an external system such as HDFS). Once the state has been backed up, the operator forwards the barrier downstream. Eventually, all operators wil register a snapshot of their state and a global snapshot will be complete.
For example, in Figure 5 we show that snapshot t2 contains all operator states that are the result of consuming all records before t2 barrier. ABS bears resemblances to the Chandy-Lamport algorithm for asynchronous distributed snapshots [11]. However, because of the DAG structure of a Flink program, ABS does not need to checkpoint in-flight records, but solely relies on the aligning phase to apply all their effects to the operator states. This guarantees that the data that needs to be written to reliable storage is kept to the theoretical minimum (i.e., only the current state of the operators).
翻譯過來就是,Flink 中使用的機制被稱為異步障礙快照( Asynchronous Barrier Snapshotting)。障礙是注入進輸入流的控制記錄,它對應邏輯時間,然后邏輯上區分兩個部分:影響到當前快照的一部分流和其他部分。
一個算子從上游數據接收到 barrier,然后先執行一次 alignment,確保所有輸入到 barrier 的數據都被接收到。接下來,算子將它的狀態(滑動窗口的內容 / 自定義的數據結構)到持久化存儲(可以是一個外部系統,如 HDFS)。狀態備份好了之后,算子則繼續將 barrier 轉發到下游。最終,所有的算子都會注冊它們的狀態快照,這樣,一個全部的快照則完成了。
讀到這里,我有一個疑問了:我們知道,Flink 無法保證數據之間的順序。那么有沒有可能在 barrier 之前的數據,在傳輸的過程中,由于順序被打亂,來到了 barrier 之后?例如,1 2 3 barrier a b c 在傳輸的過程中,會不會變成 1 2 3 a barrier b c?這種情況下,barrier 不就無法正確地確定 checkpoint 的時機了嗎?
后來有同學在公司內部分享 Flink 的時候,我提出了這個疑問,分享人也表示沒有考慮過這個問題。于是,我打算一探究竟,搞它!
2 在 Flink 中,數據數據亂序的原因是什么?
2.1 首先,數據為什么會亂序?
所謂數據的亂序,是對業務上來說的。在業務開發人員看來,在一個算子中,會出現這樣一種現象:用戶的上車記錄先于用戶的下車記錄到達。這個結果看起來使人疑惑,但如果了解背景的話就可以很快想到,這是由 kafka 的多 partition 以及 Flink 的多 parallelism 帶來的,這也是當我們想要保證數據順序的時候面臨的兩個困難。
2.2 那么在業務中,怎么解決亂序問題呢?
實際上,在很多情況下,我們并不是想給數據做一個嚴格的“排序”——數據順序不是最終目的,而是達成目的的方式。既然是方式,就不具有唯一性。我們想要保證順序,是因為順序隱含了因果(causual)關系,而因果關系才是我們想要的。因此,為了保證順序,我們引入了 timestamp 的概念,所以有了所謂的 Flink 的三種時間(發生時間、到達時間、攝入時間),開發人員可以通過這些時間戳,來推理出事件之間的真正因果關系,和 barrier 關系不大,在這里就不展開討論了。
3 checkpoint 被觸發的時機,會受數據亂序的影響嗎?
同樣地,在 barrier 場景下,我們沒有必要去花費大力氣來得到具有嚴格順序的數據。因為每一個 barrier 的作用,僅僅是把數據流切分為兩部分:barrier 之前、barrier 之后。
回到原來的問題,1 2 3 barrier a b c 在傳輸的過程中,會不會變成 1 2 3 a barrier b c?
要想回答這個問題,要知道 barrier 的運行機制。可以參考 State Management in Apache Flink?: Consistent Stateful Distributed Stream Processing ,比較長,里面有一些數學推理,沒太看懂,我就用大白話說一下我的理解吧。
首先,這個算法有一系列的假設,其中很重要的一條是 data channel 的 FIFO 特性:
3.2.2 Main Assumptions
Directional data channels between tasks are reliable, respect FIFO delivery and can be blocked or unblocked. When a channel is blocked, in-transit messages are internally buffered (and possibly spilled to disk) and can be delivered on that end once it unblocks.
意思是說,算法有個基本假設,即我們在數據傳輸時的管道是 FIFO 的。你也許會想,這和前面說的“亂序”不是矛盾了嗎?
再一想,并沒有矛盾,因為 FIFO 指的是兩個節點(一個發送方,一個接收方)之間的傳輸過程,而亂序現象是由多個節點(多個發送方,一個接收方)造成的。
好,現在 FIFO 的假設是成立的。那么這個算法是如何解決多個節點導致的亂序問題呢?
用下面的 Figure 3 舉個例子,在圖中,不同的 barrier(圖上稱為 markers) 將整個數據流分成了不同的顏色。可以看到有 t1 t2 t3 t4 t5 節點,我們可以想象成 5 個 TaskManager。這些 TaskManager 通過一種 alignment 機制,當收到一種類型的 barrier 時,會等待所有上游數據源的 barrier 都到達自己,相當于進行了一個同步的過程,之后才進行持久化存儲 state,然后將 barrier 發送給下游。
至于 alignment 機制,可以看下面這張圖,我把它簡單的理解為一種對齊機制。因為對于每個 TaskManager 來說,可能有多個數據源,所以在進行下一步操作之前,我們需要存儲這一步的狀態,而所謂對齊,就是等待所有的數據源都發來當前 term 的 barrier,這時候由于我們之前有過 FIFO 的保證,所以拿到所有數據源的 barrier 之后,我們可以認為當前我們處于一個完整的 state,下一步就是存儲當前 state 的快照,即 checkpoint。
所以說,此亂序非彼亂序。由于存在 FIFO 的保證,barrier 之前的數據不會在傳輸的過程中跑到 barrier 之后去;而由多輸入源帶來的數據的亂序,因為有 alignment 機制的保證,并不會影響通過 barrier 實現的 checkpoint 的準確性。
4 Reference
State Management in Apache Flink?: Consistent Stateful Distributed Stream Processing
Apache Flink?: Stream and Batch Processing in a Single Engine
Stateful Stream Processing
From Aligned to Unaligned Checkpoints - Part 1: Checkpoints, Alignment, and Backpressure
最后,筆者才疏學淺,各位如果有任何想說的,或者覺得本文有疏漏以及一些不嚴謹的地方,歡迎留言一起探討!
總結
以上是生活随笔為你收集整理的Flink Checkpoint 机制:如何保证 barrier 和数据之间不乱序?的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Purpose of cmove ins
- 下一篇: leetcode 813. Larges