让时间倒流的保存点:用Apache Flink的保存点技术重新处理数据流
本文翻譯自:Savepoints: Turning Back Time, Reprocessing Data Streams with Savepoints in Apache Flink,原作者為Fabian Hueske(@fhueske)和Mike Winters(@wints)。翻譯已獲得原網站授權。
\\這篇文章是系列文章的第一篇,數據工匠團隊會在這里為大家展示一些Apache Flink的核心功能。
\\流處理通常被大家與“動態數據”關聯起來,相應的系統差不多會在數據被創造出來的那一刻就立刻對其進行處理或響應。像延遲、吞吐量、水印和處理遲到的數據等等都是大家討論得最多的流處理話題,通常是關注現在,而不是過去。
\\可在實際項目中,卻有許多種場景需要你的流處理程序把以前處理過的數據再重新處理一遍。這里有些例子:
\\- 為你的程序部署一個新版本,可能是有新功能、修復了問題、或者采用了更好的機器學習模型;\\t
- 使用相同的源數據流對應用程序的不同版本進行A/B測試,兩邊都從同一個點開始測試,這樣就不會犧牲之前的狀態;\\t
- 評估或開展將應用程序遷移到更新版本的處理框架上,或是一個不同的集群上;\
Apache Flink的保存點(Savepoint)功能可以支持上面的所有場景,并且也是讓Flink與其它分布式開源流處理器不同的一個顯著區別點。
\\在本文中,我們會講述如何使用保存點功能來重新處理數據,并一定程度地深入底層,講述這個功能在Flink中是怎么實現的。
\\“重新處理”到底是什么意思?
\\為了保證大家對重新處理數據的理解是一致的,我們先討論一個你可能需要重新處理數據的業務例子。想像一個社交媒體公司,她除了基本的發貼功能之外,還發布了一種付費的、或者說是推廣發貼的功能。
\\公司的用戶可以訪問一個簡單的、基于Flink實現的儀表板,顯示他們的所有文章(不管是普通的還是付費的)被大家查看、點擊等等的次數。幾個星期之后,從用戶的反饋中就可以清晰地看到,這個儀表板如果能把普通的發貼數據和付費的發貼數據區別開來,那就會更好用。
\\要實現這個功能,就有必要返回到付費發貼功能最初發布的那個時刻,然后從那個時刻開始,把所有數據全都重新處理一遍。這一次要把付費貼和普通貼的展示和交互全都拆開來。如果要把從公司創立伊始產生的數據全都重新處理一遍,這就實在有點強人所難,所以能夠從付費發貼的功能發布的時候開始重新處理,同時還保留之前的計算結果,這個功能就很有必要了。
\\所以當我們用到“重新處理”這個詞時,我們的意思就是回到一個系統以前的、一致的狀態(按開發者的定義,不一定非要是流的最早狀態),然后從那個狀態開始再處理一遍,可能也要在更改了你的Flink程序之后。
\\讀者們可以看到的好消息就是:Flink為大家免費提供了上述重新處理功能,相應的功能就叫保存點。我們說\"免費\",意思是只要你的程序是容錯的,并且可以從錯誤中恢復,那你就可以在Flink中創建一個保存點并重新處理數據,花費的額外準備工作量幾乎為零。
\\簡單說說保存點到底是什么
\\簡而言之,一個Flink程序的保存點就是關于以下兩點的全局一致的鏡像:
\\- 所有數據源的位置;\\t
- 所有并行操作者的狀態;\
“全局一致”意味著所有并行的操作者的狀態都在所有輸入的相同的明確定義的位置處被記錄下來了。
\\如果在過去的某個時刻,你為某個應用程序記下了保存點,那你就可以從那個保存點的位置開始啟動一個新程序。新的程序將使用那個保存點位置保存下來的操作者的狀態進行初始化,并且會從記錄的保存點里各個數據源的相應位置開始,重新處理全部數據。
\\因為Flink的保存點之間是相互完全獨立的,所以對每個程序你都可以有多個保存點,這樣你就可以根據這些不同的保存點的信息,回到不同的位置,啟動多次、甚至不同的程序(如下圖所示)。這個功能對于派生你的流處理程序,或者為它們打不同的版本,是非常有用的。
\\\\我們應該注意,在從某個保存點開始重新處理數據時,對事件的時間處理是非常重要的。重新處理基本上就意味著從過去到現在進行快速回放,也就是說,是全速地從某些存儲系統中讀出數據,直到趕上了當前的狀態,然后再繼續實時地處理新到達的數據。
\\因為程序對于時間的處理或者插入時間都是要依賴當前的本地時間的,那么如果在根據保存點啟動程序時不使用事件的時間,而使用別的時間,對程序的邏輯而言就很可能導致錯誤的結果。
\\聽起來不錯,那我該做什么?
\\不用做很多!事實上,所有支持故障恢復的程序都是自動支持保存點的。因此,大多數進行有狀態計算的程序已經滿足了需要的條件。如果沒有,可以對它們進行快速更新,讓它們具備:
\\- 啟用檢查點功能:在每種情況下,我們都推薦在構建Flink程序的同時,把檢查點功能打開,事實上在你的Flink程序中加上檢查點只是需要增加幾行代碼而已。\\t
- 可以重置的數據源(即Apache Kafka、Amazon Kinesis,或者文件系統等):數據源必須能按照你想要重新處理的點開始,重放數據。\\t
- 所有的狀態都通過Flink的管理狀態接口保存:所有具體的操作者的狀態都必須保存在Flink的容錯狀態數據結構中,這讓它可以按照某個之前的保存點位置被重置。\\t
- 配置一個合適的狀態后臺:Flink提供了不同的狀態后臺來將檢查點和保存點持久化。默認地,保存點都保存在JobManager中,但你要為你的程序配置一個適當的后臺狀態程序,比如RocksDB等。\
如果你已經在運行一個容錯的程序了,那就創建一個保存點,然后從保存點的位置開始重新啟動程序,這只需要在Flink命令行里敲幾個命令就可以了。咱們接下來挨個看看。
\\第一步:創建一個保存點
\\首先,獲得所有運行中的Flink任務的列表:
\\\user$ flink list\------------Running/Restarting Jobs------------\10.10.2016 16:20:33 : job_id : Sample Job (RUNNING)\\\(運行上面的命令時,你的真實任務ID會是一個包括字母和數字的字符串。)
\\然后,用相應的任務ID創建一個保存點:
\\\user$ flink savepoint job_id\\\現在你的保存點就已經可用了。
\\如果你準備馬上根據你的保存點來重新啟動任務,你通常會想要把現在正在運行的任務先停掉。你已經有了相應任務的ID,那把它停掉只要幾秒鐘就夠了:
\\\user$ flink cancel job_id\\\第二步:從一個保存點開始啟動任務
\\當你更新完程序之后,就可以從你的保存點開始啟動任務了。
\\\user$ flink run -d -s hdfs://savepoints/1 directory/your-updated-application.jar\\\如果你想在一個示例程序中自己重做這些步驟,我們推薦你看看一篇之前的博客文章,我們在那里講了怎么做這件事。
\\如果我想升級我的程序,該怎樣做?
\\如果你想從一個保存點開始啟動一個修改過的程序,有幾件事是要考慮的。我們可以區別下面這兩種情況:
\\第一種情況很簡單,不需要什么特別的準備。你可以按你的需要去修改函數代碼。不過,如果你用一個修改了的架構從保存點開始啟動程序,那么為了能夠恢復操作者的狀態,Flink必須能夠將保存點程序的操作者與使用了新架構的新程序的操作者對應起來。
\\在這種情況下,你就要手動地將操作者ID分配給最初的和更新了的程序。因為如果沒有操作者ID的話,是沒辦法修改程序的架構的。所以最佳實踐經驗就要求一定要分配操作者ID。
\\下面的代碼段顯示了如何為操作者們分配ID。
\\\DataStream stream = env.\ // Stateful source (e.g. Kafka) with ID\ .addSource(new StatefulSource())\ .uid(“source-id”)\ .shuffle()\ // The stateful mapper with ID\ .map(new StatefulMapper())\ .uid(“mapper-id”)\\// Stateless sink (no specific ID required)\stream.print()\\\請查閱文檔,了解更多關于升級程序和保存點的細節。
\\關于保存點的最佳實踐
\\要更好的利用上文中描述的Flink的重新處理功能,你應該經常觸發,生成新的保存點。我們建議要根據某些時刻表(比如每天一次,每周一次,等等)自動地生成保存點,而且每當你關閉某個任務或發布程序的新版本時,也最好先生成保存點。
\\依據你想用Flink做的事件不同,生成保存點的最佳方法也會不同,但總的來說,在構建你的程序時你應該花些時間考慮如何使用這些保存點。
\\這些東西是怎么工作的呢?
\\保存點事實上只是檢查點的一個延伸,這就是Flink的容錯機制。如果開啟了檢查點功能,Flink就會周期性地為所有的操作者狀態生成一個一致的檢查點。在文檔中詳細的描述了檢查點的細節,如果你是個Flink新手,花些時間去讀讀是非常值得的。
\\你可能會以為要生成一個一致的檢查點,就得暫停數據處理,因為Flink必須要等著,直到所有沒處理完的記錄全被處理掉了,然后做個鏡像,鏡像生成之后再回去繼續處理數據。事實并非如此!Flink是持續處理數據的,即使在生成檢查點的時候也是這樣。文檔中的“Barriers”一節講了實現這個功能的原理。
\\兩者之間的關鍵區別:檢查點是基于某些規定的時間間隔自動生成的,而保存點是由用戶顯式地觸發生成的,而且不會象檢查點那樣過了一定的時間之后就會被刪掉。
\\總結
\\我們討論了Apache Flink的保存點和數據重處理功能,因為我們相信這就是Flink與開源世界中其它流處理器之間的重要區別之一。而且最重要的,在容錯的Flink程序中獲得重處理功能幾乎是不需要任何代價的,只需要很少的改動。
\\Flink社區現在還在積極地工作著,要把保存點功能做得更好,包括在改變并發度的情況下保存狀態的解決方案等。有些相應的功能(比如Flink-3755)已經發布到主分支上了,而且會被包含到下一個小版本Flink 1.2.0中。
\\所以,當你需要把程序多部署一份,或者上個新版本,或者要做A/B測試,或者要讓多個程序從同一個點開始處理數據時,你可以這么做了,而且不會丟失那些寶貴的狀態數據。
\\當有真實的需求時,流處理基于實時的特性不應該阻擋你把時間調回過去的動作。
\\有興趣了解關于Apache FLink的保存點的更多內容嗎?數據工匠CTO Stephan Ewen做了一個關于這個話題的七分鐘白板演練,你可以在MapR博客上看到相關內容。
總結
以上是生活随笔為你收集整理的让时间倒流的保存点:用Apache Flink的保存点技术重新处理数据流的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 一个SQL性能问题的优化探索(二)(r1
- 下一篇: 在CENTOS7下安装kubernete