webform计算某几列结果_大数据测试场景科普 流计算篇 (上)
前言
3年前的時候寫過關于一些大數據入門基礎的文章, 當時學習的是spark。文章鏈接如下:
大數據介紹:https://testerhome.com/topics/7988
spark基礎操作:https://testerhome.com/topics/8040
shuffle和性能測試:https://testerhome.com/topics/8120
離線大數據作業的測試方法:https://testerhome.com/topics/17092
這一篇算是彌補了之前對于流計算的缺失吧。由于我們產品在今年加入了流計算的能力, 并且Flink貌似也有要在流計算領域中一統江山的架勢,所以我前段時間借著調研混沌工程方案的契機,也開始學習了Flink并了解我們產品對于流計算的應用場景(PS:混沌工程是流計算中一個比較重要的測試手段)。今天把學習和實踐的一些總結分享出來。
什么場景需要流計算
流計算一般都是在一些數據計算的實時性要求很高的場景中出現, 之前在講spark的時候都是基于離線的批處理計算的, 這種計算方式無法滿足產品對實時性的要求。比如如果我們要在大看板上計算PV和UV的數據, 一般都是希望能夠實時的觀看到這些數據的變化。而計算PV和UV的操作又不好嵌入到業務系統中, 因為對業務侵入性太強并且會影響性能。所以一般的架構可能是如下的樣子:
業務系統會將用戶行為數據push到消息中間件(kafka)中, 這樣達到了解耦和降低性能開銷的目的。而flink streaming 服務會訂閱kafka的topic進行流處理, 也就是一旦有數據從kafka中發送過來,滿足一定條件后就會觸發flink的一系列算子進行計算, 數據在flink中的這些算子中進行傳遞,聚合,計算等操作后, 將經過處理的數據推送給外部的存儲系統或者業務系統, 這些系統會將數據做進一步保存和處理后展示在大屏上--這就是一種計算PV和UV的簡單的場景了。整個過程之所以叫流,就是因為數據并不是像傳統的方式保存到文件系統中,在保存到了一定的量或者利用定時任務觸發批處理計算的方式執行。而是數據就像一條pipeline(流水線)一樣,來一個(或者一小批,可以規定時間窗口,可以規定數據個數)就處理一個,并把處理結果傳到pipeline的下一個算子上繼續處理,這種方式是不是有點像jenkins的piepline~ 實際上很多AI系統,比如推薦系統,反欺詐系統這些對實時性要求比較高的場景都要利用流計算來實時的進行處理。如果要展開它的內部處理過程大概是下面這個圖:
重點說明一下上圖中的task, 這個task就可以理解為Flink中的算子了, 也叫operator。在Flink中可以定義當數據到來的時候, 都經過哪些算子,按照什么順序進行計算。比如可以先使用filter算子把沒用的數據進行過濾, 再使用map算子對原始數據做一些轉換, 后面再使用sum算子進行累加計算出PV。當然這些task是可以并行計算的,Flink可以合并計算結果。要是寫代碼的話,大概是下面這個樣子:
上面是大數據領域經典demo workcount, 計算文件中的詞頻。上面我用紅色框起來的部分就是算子, 先是flatmap做一些處理, 再使用keyBy算子把數據分類, 把有相同的key的數據分到一個組里,然后進行sum的累加計算, 這樣就能計算出每個key(單詞)的詞頻了(這個單詞出現多少次)。PS:代碼里的keyBy(0)中的0是數據的第幾列, 意思是按第幾列進行分組。
OK, 上面就是簡單講講什么是流計算以及什么場景需要流計算。flink的算子和運行模式跟spark是很像的,對flink的使用還有疑問的同學可以看看我之前寫的spark基礎。下面要開始講測試點了。
從消息中間件說起
好像現在業界主流的能支持流計算的消息中間件也就只有kafka了,所以我下面都用kafka來舉例(實際上我也只用過kafka,請原諒我知識上的匱乏),這里我要講一下kafka的精準一次性語義, 之所以講這個是要開始講述流計算中最重要最難以驗證的一個場景(對,我就是想先講難的,重要的) ---- 數據一致性。什么是數據一致性呢,就是不論在任何情況下數據被處理的結果都是一致的。這里說的任何情況包括但不限于:
計算任務異常重啟后導致之前已經計算過的數據丟失
計算任務異常重啟后導致之前已經計算過的數據在本次任務進行重試的時候造成的數據重復計算
計算任務不能因為網絡延遲,異常等因素導致數據傳遞給下游系統失敗后導致的數據丟失
那么我們看kafka是怎么處理這種情況的, 當我們使用kafka的producer向broker推送消息的時候,怎么能保證本次推送的消息不會因為各種異常導致數據丟失呢?很多小伙伴可能已經想到了重試, 如果因為網絡異常等原因導致push請求異常的話,那么我們重試幾次就好了,畢竟kafka開了高可用模式,集群上會有其他的broker提供服務,就算當前的broker徹底跪了數據也不會丟失的。但是我們是否想過一個問題,重試請求是可以隨便執行的么?或者說程序怎么能確定本次失敗的推送請求就是真的失敗了,也就是數據沒有保存到kafka上。在kafka中確認消息是否推送成功是需要producer和broker互相交換ACK的, 也就是producer在把消息推送給broker后,broker在保存成功后要給producer回一個ACK讓 客戶端知道消息已經保存成功了。那么如果我們的異常是發生在broker已經保存好數據和把ACK發送到客戶端之間呢?也就是數據已經保存好了, 只是沒有給客戶端返回ACK,所以客戶端認為這個推送消息的請求是失敗的。那么這個時候如果我們執行了retry的邏輯,實際上數據就出現了重復的場景。
這么解釋大家是不是就明白了retry的邏輯不是能隨隨便便加的,它有一個前提條件, 就是它要retry的那個接口必須是冪等的。這個我再當初講混沌工程的時候也提過, 一個高可用的系統,它的接口必須是冪等的, 因為高可用的模式說白了就是上游系統retry,下游系統多副本負載均衡+冪等。只有下游系統有冪等的能力,上游系統才敢執行重試操作, 否則的話就是數據重復寫。那么冪等是什么意思呢, 大白話就是接口自己能判斷出當前的請求是不是之前已經發送過的重復數據了,如果是重復數據它是不處理的。行話就是同樣的數據不論計算多少次都不對結果造成影響,此為冪等。
而kafka的精準一次性語義中定義了幾個級別的模式, 其中有一個叫exactly once(精準一次性語義,意思是我保證針對一個數據不管你重復發送多少次,服務端都只計算一次)這種模式就可以解決這個問題。在producer中可以設置冪等和分布式事務相關的參數和代碼, 一旦這樣設置了,那么就擁有了冪等屬性, kafka內部會根據算法計算出消息的唯一id,broker只要查詢消息的id在之前是否有保存過就可以判斷出當前消息是否是重復數據了(大概是這樣,細節沒研究過)。這樣客戶端就可以肆無忌憚的進行重試而不必擔心數據重復計算。
再談Flink的exactly once
通過講述Kafka的精準一次性語義也就是exactly once 是為了跟大家講述什么是數據一致性以及保證數據一致的方法和重要性。由于kafka本身提供了這種特性所以要保證消息傳送到kafka的數據一致性是比較容易的, 正因為很容易一般不容易出錯所以很多團隊都忘了去測試這個場景(有時候研發會忘了設置這個參數導致出現bug,所以最好還是需要測一下)。當時光保證kafka的精準一次性是不行, 我們是一個業務場景, 我們需要的是端到端的一致性, 得是全鏈路的一致性。所以現在我們來看看Flink這一層怎么做的exactly once。
Checkpoint
講到這里就必須要說明一下大名鼎鼎的checkpoint了, 基本上checkpoint是所有分布式框架都要有的機制,spark如此flink亦如此。checkpoint就是一種保存我們在計算過程中的數據的方式, 它會根據設置周期性的觸發checkpoint來保存我們計算的中間結果。我們還是用PV的案例說明:
我們從Kafka讀取到一條條的消息,從消息中解析出app_id,然后將統計的結果放到內存中一個Map集合,app_id做為key,對應的pv做為value,每次只需要將相應app_id 的pv值+1后put到Map中即可。
這里簡要說明一下kafka的offset, 這個是消費消息的客戶端也就是consumer要使用offset來記錄我已經讀取到了消息隊列中的哪一條數據, 根據這個offset我可以知道下一次我要讀取的消息的位置。即便是程序崩潰了, 只要offset能夠保存下來就知道恢復后應該從哪個消息開始讀取了。所以在這個機制下,flink的Source task記錄了當前消費到kafka test topic的所有partition的offset。所以flink會根據策略周期性的觸發checkpoint事件以流的方式傳遞給所有的算子, 算子收到checkpoint命令后就會把中間狀態保存起來, 比如在我們的案例里保存的就是kafka的offset, 比如我們設置每30s觸發一次checkpoint, 那么30s后checkpoint觸發,保存的數據為:
chk-100
offset:(0,1000)
pv:(app1,50000)(app2,10000)
該狀態信息表示第100次CheckPoint的時候, partition 0 offset消費到了1000,pv統計結果為(app1,50000)(app2,10000)。那么如果任務掛了,這時候怎么辦?比如:
假如我們設置了三分鐘進行一次CheckPoint,保存了上述所說的 chk-100 的CheckPoint狀態后,過了十秒鐘,offset已經消費到 (0,1100),pv統計結果變成了(app1,50080)(app2,10020),但是突然任務掛了,怎么辦?
flink只需要從最近一次成功的CheckPoint保存的offset(0,1000)處接著消費即可,當然pv值也要按照狀態里的pv值(app1,50000)(app2,10000)進行累加,不能從(app1,50080)(app2,10020)處進行累加,因為 partition 0 offset消費到 1000時,pv統計結果為(app1,50000)(app2,10000)。
上面講的并行度為1的情況, 那么如果并行度是N的情況,checkpoint會在并行的算子里觸發,這個時候Flink會選擇是保持多個checkpoint一起執行完后在統一往后運算(exactly once), 還是選擇不去協調,任意一個算子運行完checkpoint后就當前線程就繼續往下運算(at least once),因為at least once模式會造成并行的算子的checkpoint不是同時觸發和結束, 所以他們保存的中間態數據有偏差,也就是數據是會不一致。所以如果業務場景有數據強一致性的需求,那么需要將checkpoint模式設置為exactly once。這里大家能明白了么?我們通過把kafka的offset和我們已經計算好的結果都通過checkpoint進行保存來防止數據丟失或重復計算的情況。代碼差不多如下:
當然上面是checkpoint策略, 在實際開發算子任務的時候,要把什么數據通過checkpoint保存到flink的state backend是需要先調用對應的state 方法來執行的。
貼一個checkpoint的圖:
說回數據一致性
好了上面說了那么多東西, 但是好像kafka和Flink 都已經把數據一致性保證好了, 那還需要我們測試什么一致性么?那不是變成了在測試kafka或者flink么?我想一定會有同學這么問, 那么我在這里解釋下:
即便kafka和flink有exactly once 語義, 但是開啟這些語義需要對應的參數調整, 并且需要編碼的時候進行處理, 比如kafka里在開啟了exactly once 語義后, 也需要研發在代碼里顯示調用分布式事務進行數據計算, flink里對于kafka的offset和計算結果的保存也需要顯示在代碼里調用類似valueState來進行保存和處理。也就是你們的產品研發同學是否編碼正確決定了數據一致性。
在我們的流計算里, flink上下游都會對接不同的系統, 上游可以是kafka,也可以是業務系統暴露出來的socket服務,也可以其他的源。所以你在使用非kafka也就是沒有exactly once語義支持的系統的時候,就需要研發去開發相應的方案來解決這個問題。同理輸出方, 流是有數據的源,也有在經過flink計算之后輸出的系統,這個系統可以是另外一個kafka,也可以是mysql, 也可以是業務系統的接口。那么輸出方是否有exactly once語義支持呢?非kafka的場景下,基本上也是沒有的, 也需要研發來開發對應的方案。也就是說我程序中Flink的CheckPoint語義設置了 Exactly Once,但是我在計算的過程中需要實時的把計算結果保存到mysql里,那異常出現的時候根據checkpoint機制,我們從上一個checkpoint記錄中保存的offset去重新讀取并計算消息, 這時候我的mysql中看到豈不是看到了數據重復了?比如程序中設置了1分鐘1次CheckPoint,但是5秒向mysql寫一次數據,并commit。所以我們要求的是Flink的end to end的精確一次都必須實現。如果你的chk-100成功了,過了30秒,由于5秒commit一次數據庫,所以實際上已經寫入了6批數據進入mysql,但是突然程序掛了,從chk100處恢復,這樣的話,之前提交的6批數據就會重復寫入,所以出現了重復消費。Flink的精確一次有兩種情況,一個是Flink內部的精確一次,一個是端對端的精確一次。這里面有點繞,我解釋的有點啰嗦。
所以根據上面說的,雖然flink提供了exactly once 語義, 但是它的exactly once 語義只保證flink自己的數據計算過程,而不是端到端的。想要保證數據一致性,還是需要研發同學針對業務場景進行特殊的設計。也就是開發自己產品的exactly once 語義。所以我們還是要針對端到端的場景進行測試。
測試的注意事項
首先弄清楚產品中流計算的架構,都有哪些數據源,數據又發送到哪些地方。這一步至關重要, 因為端到端的數據一致性場景,在這一條流式鏈條里,任何一個點沒有做到精準一次性語義都會導致數據不一致,所以我們要測試所有的點。
完成第一步后在每一個點進行故障注入,故意讓任務失敗,讓服務掛掉, 屬于混沌工程式的測試方法, 就是想盡辦法讓這個流式的鏈條中的服務出故障來驗證數據一致性。沒有測試工具的同學可以去看一下阿里開源的chaos blade。注意:一個場景的故障注入要反復進行,比如30分鐘內每隔3分鐘都隨機找到一個flink task manager進行kill來注入故障, 有些時候只注入一次故障是發現不了bug的,因為我們是有狀態計算,有狀態計算的場景很多是在特殊的狀態下發生故障才會出錯。所以要反復注入故障來最大概率的觸發bug。
自動化測試中case要驗證數據一致性的點,比如在kafka->flink-mysql 的這個場景里,你往數據源kafka里灌入了1000個消息,如果正確的邏輯是經過計算后要往mysql存入10條記錄, 那么你要去驗證這10條記錄的正確性。是否有數據丟失或者重復的結算結果出現。
注意:做這個測試前, 先確定你們是否有數據一致性的強需求。有些場景真的會覺得數據丟了就丟了。。。。。
結尾
好了寫這么多, 今天羅里吧嗦的寫了一大堆好像就說了一個數據一致性的測試。之前在社區跟人討論的時候,有很多同學其實不贊同這種深入研發架構的測試方式。而我前兩天刷脈脈的時候也在匿名區看到有人發消息, 討論區里對qa是否要測試這種場景有很大的爭議。所以我花了很大的篇幅解釋一下做這種測試的必要性。下次我們將其他的測試方法。
打下廣告,社區線上沙龍,12月27日,就是本周日下午,WeTest,微信,網易和大家一起探討 devops下的質量保障!由于社區小助手失聯了。請大家加下面兩位同學的微信,給大家拉到討論群中去~
xiaozhao129540
oscarx1982
總結
以上是生活随笔為你收集整理的webform计算某几列结果_大数据测试场景科普 流计算篇 (上)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 苹果a1863是什么版本
- 下一篇: lol最克制诺手的英雄_上路克制诺手的英