Apache Flink fault tolerance源码剖析(六)
上篇文章我們分析了基于檢查點的用戶狀態的保存機制——狀態終端。這篇文章我們來分析barrier(中文常譯為柵欄或者屏障,為了避免引入名稱爭議,此處仍用英文表示)。檢查點的barrier是提供exactly once一致性保證的主要保證機制。這篇文章我們會就此展開分析。
這篇文章我們側重于核心代碼分析,原理我們在這個系列的第一篇文章《Flink數據流的Fault Tolerance機制》
一致性保證
Flink的一致性保證也依賴于檢查點機制。在利用檢查點進行恢復時,數據流會進行重放(replay)。對于有狀態的operation以及function,Flink定義了檢查點支持的兩種模式(CheckpointingMode):
- EXACTLY_ONCE
- AT_LEAST_ONCE
在定義該枚舉時,還對這兩個枚舉值進行了詳細的解釋:
EXACTLY_ONCE
這種模式意味著系統將以如下語義對operator和udf(user defined function)進行快照:在恢復時,每條記錄將在operator狀態中只被重現/重放一次。
例如,如果有一個用戶在一個流中應用統計元素個數的函數,該統計結果將總是跟流中的元素的真實個數一致,不管是失敗還是恢復。
需要注意的是,這并不意味著每個數據流過streaming data flow僅僅一次。它表示的是在恢復進行時,operators/functions的狀態被恢復(通過檢查點關聯的狀態),使得被恢復的數據流在其狀態最后一次修改之后(最新的檢查點)被恰好獲取一次。
并且,這里的EXACTLY_ONCE模式也并不保證Flink在跟外部系統交互時的行為也滿足EXACTLY_ONCE的一致性保證(Flink只保證自己的operator以及function的狀態)。雖然,通常要求在兩個系統之間都達到一致性保證,但我們可以通過實現連接器來達到這樣的要求(比如Apache Kafka的offset可以實現這個需求)。
這種模式可以支撐高吞吐,取決于數據流圖以及操作,這種模式可能會增加記錄處理的延遲,因為operator需要對齊他們的輸入流,來保證創建一個一致的快照點。對于沒有進行重新分區的簡單數據流,這些延遲的增加是可以忽略不計的,而對于進行了重新分區的簡單數據流,延遲的平均值很小,但最慢的記錄通常有一個明顯的延遲。
AT_LEAST_ONCE
這個模式意味著系統將以一種更簡單地方式來對operator和udf的狀態進行快照:在失敗后進行恢復時,在operator的狀態中,一些記錄可能會被重放多次。
例如,如果有一個用戶函數用來統計流中的元素個數,在失敗后恢復時,統計值將等于或者大于流中元素的真實值。
這種模式對延遲產生的影響很小,通常應用于接收低延遲并且容忍重復消息的場景。
barrier定義
checkpoint barriers用來在流拓撲中對齊檢查點。
單個數據流視角,barrier示意:
分布式多input channel視角,barrier示意圖:
該圖演示的是多barrier aligning(對齊),但只有EXACTLY_ONCE一致性時才會要求這一點
JobManager將指示source發射barriers。當某個operator從其輸入中接收到一個CheckpointBarrier,它將會意識到當前正處于前一個檢查點和后一個檢查點之間。一旦某operator從它的所有input channel中接收到checkpoint barrier。那么它將意識到該檢查點已經完成了。它可以觸發operator特殊的檢查點行為并將該barrier廣播給下游的operator。
checkpoint barrier的ID是嚴格單調增長的。
CheckpointBarrier在Flink中被看做一個運行時事件(繼承自RuntimeEvent類)以區分普通的數據流數據(buffer),Flink中的運行時事件必須支持序列化并且可以在TaskManager之間互相通信。CheckpointBarrier只有兩個屬性:id以及timestamp。
barrier處理器
CheckpointBarrierHandler定義了響應來自input channel中的barrier的處理機制,它是提供一致性保證的核心。
Flink給出了兩個實現,分別是:元素阻塞緩存機制以及barrier跟蹤機制。
兩個關鍵接口方法:
- getNextNonBlocked :返回operator可能消費的下一個BufferOrEvent。這個調用會導致阻塞直到獲取到下一個BufferOrEvent,如果流已經完成,那么就返回null。
- registerCheckpointEventHandler : 注冊一個事件回調,用來在檢查點成功完成時執行。
BarrierBuffer
BarrierBuffer用于提供EXACTLY_ONCE一致性保證,其行為是:它將以barrier阻塞輸入直到所有的輸入都接收到基于某個檢查點的barrier,也就是上面所說的對齊。
為了避免背壓輸入流(這可能導致分布式的死鎖),BarrierBuffer將從被阻塞的channel中持續地接收buffer并在內部存儲它們,直到阻塞被解除。
getNextNonBlocked
getNextNonBlocked方法用于獲取待operator處理的下一條(非阻塞)的記錄。該方法以多種機制阻塞當前調用上下文,直到獲取到下一個非阻塞的記錄。
這里理解這個非阻塞非常重要,兩種類型的記錄是所謂的非阻塞的記錄,一種是來自于上流未被標記為blocked channel輸出的數據記錄;另一種是,從已被阻塞了的緩沖區隊列中激活了的緩沖區中提取出的數據記錄。
這里以多種機制相結合來造成對當前調用的阻塞,直到獲取到滿足上面提及的非阻塞的記錄,多種機制分別是:
- while(true)重復調用
- inputGate.getNextBufferOrEvent方法本身的阻塞調用
- 以及遞歸調用當前方法
還需要理解這里的返回值BufferOrEvent,因為barrier混入在數據流中,所以獲取到的數據可能是正常的數據流Buffer,也可能是某種特殊的Event,比如這里的barrier
分析一下getNextNonBlocked方法的實現
public BufferOrEvent getNextNonBlocked() throws IOException, InterruptedException {while (true) {// process buffered BufferOrEvents before grabbing new ones//獲得下一個待緩存的buffer或者barrier事件BufferOrEvent next;//如果當前的緩沖區為null,則從輸入端獲得if (currentBuffered == null) {next = inputGate.getNextBufferOrEvent();}//如果緩沖區不為空,則從緩沖區中獲得數據else {next = currentBuffered.getNext();//如果獲得的數據為null,則表示緩沖區中已經沒有更多地數據了if (next == null) {//清空當前緩沖區,獲取已經新的緩沖區并打開它completeBufferedSequence();//遞歸調用,處理下一條數據return getNextNonBlocked();}}//獲取到一條記錄,不為nullif (next != null) {//如果獲取到得記錄所在的channel已經處于阻塞狀態,則該記錄會被加入緩沖區if (isBlocked(next.getChannelIndex())) {// if the channel is blocked we, we just store the BufferOrEventbufferSpiller.add(next);}//如果該記錄是一個正常的記錄,而不是一個barrier(事件),則直接返回else if (next.isBuffer()) {return next;}//如果是一個barrierelse if (next.getEvent().getClass() == CheckpointBarrier.class) {//并且當前流還未處于結束狀態,則處理該barrierif (!endOfStream) {// process barriers only if there is a chance of the checkpoint completingprocessBarrier((CheckpointBarrier) next.getEvent(), next.getChannelIndex());}}else {//如果它是一個事件,表示當前已到達分區末尾if (next.getEvent().getClass() == EndOfPartitionEvent.class) {//以關閉的channel計數器加一numClosedChannels++;// no chance to complete this checkpoint//此時已經沒有機會完成該檢查點,則解除阻塞releaseBlocks();}//返回該事件return next;}}//next 為null 同時流結束標識為falseelse if (!endOfStream) {// end of stream. we feed the data that is still buffered//置流結束標識為trueendOfStream = true;//解除阻塞,這種情況下我們會看到,緩沖區的數據會被加入隊列,并等待處理releaseBlocks();//繼續獲取下一個待處理的記錄return getNextNonBlocked();}else {return null;}}}processBarrier
該方法用于處理barrier,也是分析的重點。
//獲取接收到得barrier的ID //接收到的barrier數目 > 0 ,說明當前正在處理某個檢查點的過程中 if numBarriersReceived > 0 //當前檢查點的某個后續的barrierIdif barrierId == currentCheckpointId //處理barrieronBarrier(channelIndex);//barrierId > 當前檢查點Idelse if barrierId > currentCheckpointId //當前的檢查點已經沒有機會完成了,則解除阻塞releaseBlocks(); //跳過當前檢查點,直接進入該barrier對應的檢查點currentCheckpointId = barrierId; //處理barrier onBarrier(channelIndex);else//忽略終止的檢查點的barrier,barrierId < currentCheckpointIdreturn; //接收到的barrier數目等于0且barrierId > currentCheckpointId else if (barrierId > currentCheckpointId) //說明這是一個新檢查點的初始barriercurrentCheckpointId = barrierId;onBarrier(channelIndex); //忽略之前(跳過的)檢查點的未處理的barrier else return;另一段處理接收到所有barrier的邏輯:
//接收到barriers的數目 + 關閉的channel的數目 = 輸入channel的總數目 if (numBarriersReceived + numClosedChannels == totalNumberOfInputChannels)//觸發檢查點處理器回調事件checkpointHandler.onEvent(receivedBarrier);releaseBlocks(); //解除阻塞onBarrier
將barrier關聯的channel標識為阻塞狀態同時將barrier計數器加一。代碼:
private void onBarrier(int channelIndex) throws IOException {if (!blockedChannels[channelIndex]) {blockedChannels[channelIndex] = true;numBarriersReceived++;if (LOG.isDebugEnabled()) {LOG.debug("Received barrier from channel " + channelIndex);}}else {throw new IOException("Stream corrupt: Repeated barrier for same checkpoint and input stream");}}releaseBlocks
解除所有channel的阻塞,并確保剛剛寫入的數據(buffer)被消費。
首先是重置狀態標識:
for (int i = 0; i < blockedChannels.length; i++) {////將所有channel的阻塞標識置為falseblockedChannels[i] = false; } ////將接收到的barrier累加值重置為0 numBarriersReceived = 0;接下來,
//如果當前的緩沖區中的數據為空 if (currentBuffered == null) {// common case: no more buffered data//初始化新的緩沖區讀寫器currentBuffered = bufferSpiller.rollOver();//打開緩沖區讀寫器if (currentBuffered != null) {currentBuffered.open();} } else {// uncommon case: buffered data pending// push back the pending data, if we have any// since we did not fully drain the previous sequence, we need to allocate a new buffer for this one//緩沖區中還有數據,則初始化一塊新的存儲空間來存儲新的緩沖數據BufferSpiller.SpilledBufferOrEventSequence bufferedNow = bufferSpiller.rollOverWithNewBuffer();if (bufferedNow != null) {//打開新的緩沖區讀寫器bufferedNow.open();//將當前沒有處理完的數據加入隊列中queuedBuffered.addFirst(currentBuffered);//將新開辟的緩沖區讀寫器置為新的當前緩沖區currentBuffered = bufferedNow;} }BarrierTracker
BarrierTracker會對各個input channel接收到的檢查點的barrier進行跟蹤。一旦它觀察到某個檢查點的所有barrier都已經到達,它將會通知監聽器檢查點已完成,以觸發相應地回調處理。
不像BarrierBuffer,BarrierTracker不阻塞已經發送了barrier的input channel,所以它不能提供exactly-once的一致性保證。但是它可以提供at least once的一致性保證。
這里不阻塞input channel,也就說明不采用對齊機制,因此本檢查點的數據會及時被處理,并且因此下一個檢查點的數據可能會在該檢查點還沒有完成時就已經到來。所以,在恢復時只能提供AT_LEAST_ONCE保證。
getNextNonBlocked
還是來重點觀察getNextNonBlocked方法:
public BufferOrEvent getNextNonBlocked() throws IOException, InterruptedException {while (true) {//從輸入中獲得數據,該操作將導致阻塞,直到獲得一條記錄BufferOrEvent next = inputGate.getNextBufferOrEvent();//null表示沒有數據了if (next == null) {return null;}//這是跟BarrierBuffer的關鍵差別,只要它不是一個barrier,就直接返回//不管BufferOrEvent對應的channel是否已處于阻塞狀態,這里不存在緩存數據的做法,直接返回else if (next.isBuffer() || next.getEvent().getClass() != CheckpointBarrier.class) {return next;}else {//如果是barrier,則進入barrier的處理邏輯processBarrier((CheckpointBarrier) next.getEvent());}}}processBarrier
處理barrier依賴于一個內部數據結構CheckpointBarrierCount,該類用來對某個檢查點的barrier做統計。
private void processBarrier(CheckpointBarrier receivedBarrier) {// fast path for single channel trackers//首先判斷特殊情況:當前operator是否只有一個input channel//如果是,那么就省略了統計的步驟,直接觸發barrier handler回調if (totalNumberOfInputChannels == 1) {if (checkpointHandler != null) {checkpointHandler.onEvent(receivedBarrier);}return;}// general path for multiple input channels//判斷通常狀態:當前operator存在多個input channelfinal long barrierId = receivedBarrier.getId();// find the checkpoint barrier in the queue of bending barriers//所有未完成的檢查點都存儲在一個隊列里,需要找到當前barrier對應的檢查點CheckpointBarrierCount cbc = null;int pos = 0; //對應的檢查點在隊列中對應的位置for (CheckpointBarrierCount next : pendingCheckpoints) {//如果找到則跳出循環if (next.checkpointId == barrierId) {cbc = next;break;}//沒找到位置加一pos++;}//最終找到了對應的未完成的檢查點if (cbc != null) {// add one to the count to that barrier and check for completion//將barrier計數器加一int numBarriersNew = cbc.incrementBarrierCount();//如果barrier計數器等于input channel的總數if (numBarriersNew == totalNumberOfInputChannels) {// checkpoint can be triggered// first, remove this checkpoint and all all prior pending// checkpoints (which are now subsumed)//移除pos之前的所有檢查點(檢查點在隊列中得先后順序跟檢查點的時序是一致的)for (int i = 0; i <= pos; i++) {pendingCheckpoints.pollFirst();}// notify the listener//觸發檢查點處理器事件if (checkpointHandler != null) {checkpointHandler.onEvent(receivedBarrier);}}}//如果沒有找到對應的檢查點,則說明該barrier有可能是新檢查點的第一個barrierelse {// first barrier for that checkpoint ID// add it only if it is newer than the latest checkpoint.// if it is not newer than the latest checkpoint ID, then there cannot be a// successful checkpoint for that ID anyways//如果是比當前最新的檢查點編號還大,則說明是新檢查點if (barrierId > latestPendingCheckpointID) {latestPendingCheckpointID = barrierId;//添加進隊列到末尾pendingCheckpoints.addLast(new CheckpointBarrierCount(barrierId));// make sure we do not track too many checkpoints//如果超出閾值,則移除最老的檢查點if (pendingCheckpoints.size() > MAX_CHECKPOINTS_TO_TRACK) {pendingCheckpoints.pollFirst();}}}}小結
本篇文章剖析了Flink在fault tolerance時采用checkpoint barrier來實現多種一致性保證機制的核心代碼進行了分析。
微信掃碼關注公眾號:Apache_Flink
QQ掃碼關注QQ群:Apache Flink學習交流群(123414680)
總結
以上是生活随笔為你收集整理的Apache Flink fault tolerance源码剖析(六)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: LL和SC
- 下一篇: 怎样校验MD5码及sha1码数值(适用于