美团点评基于Storm的实时数据处理实践
背景
目前美團點評已累計了豐富的線上交易與用戶行為數據,為商家賦能需要我們有更強大的專業化數據加工能力,來幫助商家做出正確的決策從而提高用戶體驗。目前商家端產品在數據應用上主要基于離線數據加工,數據生產調度以“T+1”為主,伴隨著越來越深入的精細化運營,實時數據應用訴求逾加強烈。本文將從目前主流實時數據處理引擎的特點和我們面臨的問題出發,簡單的介紹一下我們是如何搭建實時數據處理系統。
設計框架
目前比較流行的實時處理引擎有 Storm,Spark Streaming,Flink。每個引擎都有各自的特點和應用場景。 下表是對這三個引擎的簡單對比:
考慮到每個引擎的特點、商家端應用的特點和系統的高可用性,我們最終選擇了 Storm 作為本系統的實時處理引擎。
面臨的問題
具體的實施方案
實時攝入數據完整性保障
數據完整性保證層:如何保證數據攝入到計算引擎的完整性呢?正如表格中比較的那樣,Storm 框架的語義為 At Least Once,至少攝入一次。這個語義的存在正好保證了數據的完整性,所以只需要根據自己的需求編寫 Spout 即可。好消息是我們的技術團隊已經開發好了一個滿足大多數需求的 Spout,可以直接拿來使用。特別需要注意的一點,在數據處理的過程中需要我們自己來剔除已經處理過的數據,因為 Storm 的語義會可能導致同一條數據攝入兩次。灰度發布期間(一周)對數據完整性進行驗證,數據完整性為100%。
實時數據平滑處理
數據預測層:實時的數據預測可以幫助我們對到達的數據進行有效的平滑,從而可以減少在某一時刻對集群的壓力。 在數據預測方面,我們采用了在數學上比較簡單的多元線性回歸模型(如果此模型不滿足業務需求,可以選用一些更高級別的預測模型),預測下一分鐘可能到來的數據的量。在數據延遲可接受的范圍內,對數據進行平滑,并完成對數據的計算。通過對該方案的使用,減輕了對集群約33%的壓力。具體步驟如下:
- 步驟一:將多個業務的實時數據進行抽象化,轉換為(Y_i,X_1𝑖,X_2𝑖,X_3i,… ,X_ni),其中Y_i為在(X_1i…X_ni)屬性下的數據量,(X_1i…X_ni)為n個不同的屬性,比如時間、業務、用戶的性別等等。
- 步驟二:因為考慮到實時數據的特殊性,不同業務的數據量隨時間變量基本呈現為M走勢,所以為了將非線性走勢轉換為線性走勢,可以將時間段分為4部分,保證在每個時間段內數據的走勢為線性走勢。同理,如果其他的屬性使得走勢變為非線性,也可以分段分析。
步驟三:將抽象好的數據代入到多元線性回歸模型中,其方程組形式為:
即:
通過對該模型的求解方式求得估計參數,最后得多元線性回歸方程。
步驟四:數據預測完之后通過控制對數據的處理速度,保證在規定的時間內完成對規定數據的計算,減輕對集群的壓力。
實時數據計算策略
策略層:Key/Value 模式更適應于實時數據模型,不管是在存儲還是計算方面。Cellar(我們內部基于阿里開源的Tair研發的公共KV存儲)作為一個分布式的 Key/Value 結構數據的解決方案,可以做到幾乎無延遲的進行 IO 操作,并且可以支持高達千萬級別的 QPS,更重要的是 Cellar 支持很多原子操作,運用在實時數據計算上是一個不錯的選擇。所以作為數據的落腳點,本系統選擇了Cellar。
但是在數據計算的過程中會遇到一些問題,比如說統計截止到當前時刻入住旅館的男女比例是多少?很容易就會想到,從 Cellar 中取出截止到當前時刻入住的男生是多少,女生是多少,然后做一個比值就 OK 了。但是本系統是在多線程的環境運行的,如果該時刻有兩對夫婦入住了,產生了兩筆訂單,恰好這兩筆訂單被兩個線程所處理,當線程A將該男士計算到結果中,正要打算將該女士計算到結果中的時候,線程B已經計算完結果了,那么線程B計算出的結果就是2/1,那就出錯啦。
所以為了保證數據在多線程處理時數據計算的正確性,我們需要用到分布式鎖。實現分布式鎖的方式有很多,本文就不贅述了。這里給大家介紹一種更簡單快捷的方法。Cellar 中有個 setNx 函數,該函數是原子的,并且是(Set If Not Exists),所以用該函數鎖住關鍵的字段就可以。就上面的例子而言,我們可以鎖住該旅館的唯一 ID 字段,計算完之后 delete 該鎖,這樣就可以保證了計算的正確性。
另外一個重要的問題是 Cellar 不支持事務,就會導致該計算系統在升級或者重啟時會造成少量數據的不準確。為了解決該問題,運用到一種 getset 原子思想的方法。如下:
public void doSomeWork(String input) {cellar.mapPut("uniq_ID");cellar.add("uniq_ID_1","some data");cellar.add("uniq_ID_2","some data again");....cellar.mapRemove("uniq_ID"); }如果上述代碼執行到[2..5]某一行時系統重啟了,導致后續的操作并沒有完成,如何將沒有完成的操作添加上去呢?如下:
public void remedySomething() {map = cellar.mapGetAll();version = cellar.mapGet("uniq_ID").getVersion();for (string str : map) {if (cellar.get(str + "_1").getVersion()!= version) {cellar.add(str + "_1", "some data");cellar.mapRemove(str);}.......} }正如代碼里那樣,會有一個容器記錄了哪些數據正在被操作,當系統重啟的時候,從該容器取出上次未執行完的數據,用 Version(版本號)來記錄哪些操作還沒有完成,將沒有完成的操作補上,這樣就可以保證了計算結果的準確性。起初 Version(版本號)被設計出來解決的問題是防止由于數據的并發更新導致的問題。
比如,系統有一個 value 為“a,b,c”,A和B同時get到這個 value。A執行操作,在后面添加一個d,value 為 “a,b,c,d”。B執行操作添加一個e,value為”a,b,c,e”。如果不加控制,無論A和B誰先更新成功,它的更新都會被后到的更新覆蓋。Tair 無法解決這個問題,但是引入了version 機制避免這樣的問題。還是拿剛才的例子,A和B取到數據,假設版本號為10,A先更新,更新成 功后,value 為”a,b,c,d”,與此同時,版本號會變為11。當B更新時,由于其基于的版本號是10,服務器會拒絕更新,從而避免A的更新被覆蓋。B可以選擇 get 新版本的 value,然后在其基礎上修改,也可以選擇強行更新。
將 Version 運用到事務的解決上也算是一種新型的使用。為驗證該功能的正確性,灰度發布期間每天不同時段對項目進行殺死并重啟,并對數據正確性進行校驗,數據的正確性為100%。
實時數據存儲
為了契合更多的需求,將數據分為三部分存儲。
Kafka:存儲稍加工之后的明細數據,方便做更多的擴展。 MySQL:存儲中間的計算結果數據,方便計算過程的可視化。 Cellar:存儲最終的結果數據,供應用層直接查詢使用。
應用案例
美團開店寶作為美團商家的客戶端,支持著眾多餐飲商家的輔助經營,而經營數據的實時性對影響商家決策尤為重要。該功能上線之后受到了商家的熱烈歡迎。卡片展示如下圖:
該功能用于與美團點評金融合作商家增加支付標簽,用以突出這些商家,增加營銷點。另一方面為優質商家吸引更多流量,為平臺帶來更多收益。展示如下圖:
總結與展望
以上就是該系統的設計框架與思路,并且部分功能已應用到系統中。為了商家更好的決策,用戶更好的體驗,在業務不斷增長的情況下,對實時數據的分析就需要做到更全面。所以實時數據分析還有很多東西可以去做。
老生常談的大數據 4V+1O 特征,即數據量大(Volume)、類型繁多(Variety)、價值密度低(Value)、速度快時效性高(Velocity)、數據在線(Online),相比離線數據系統,對實時數據的計算和應用挑戰尤其艱巨。在技術框架演進層面,對流式數據進行高度抽象,簡化開發流程;在應用端,我們后續希望在數據大屏、用戶行為分析產品、營銷效果跟蹤等 DW/BI 產品進行持續應用,通過加快數據流轉的速度,更好的發揮數據價值。
參考
- 多元線性回歸模型
關于我們
到餐數據團隊,用業內最先進的理念建設數據相關的系統和應用,期待更多數據系統開發、數據倉庫開發、數據建模好手的加入。 發郵件給liuqiang24@meituan.com 、xuyang14@meituan.com 、xuyang14@meituan.com。
總結
以上是生活随笔為你收集整理的美团点评基于Storm的实时数据处理实践的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 如何才能真正的提高自己,真正成为一名出色
- 下一篇: 函数式编程在Redux/React中的应