基于Flink百亿数据实时去重
基于傳統的Set方法去重,以及弊端
- 去重處理方法:
需要一個全局 set集合來維護歷史所有數據的主鍵。當處理新日志時,需要拿到當前日志的主鍵與歷史數據的 set 集合按照規則進行比較,若 set集合中已經包含了當前日志的主鍵,說明當前日志在之前已經被處理過了,則當前日志應該被過濾掉,否則認為當前日志不應該被過濾應該被處理,而且處理完成后需要將新日志的主鍵加入到set 集合中,set 集合永遠存放著所有已經被處理過的數據
?
- 弊端:
百億數據去重,就按照每天100億數據來算,由于數據量巨大,所以主鍵占用的空間也大,如果主鍵占用空間小就意味著數據表示的范圍小,就有可能造成主鍵沖突。 例如int類型的主鍵數據范圍是-2147483648~2147483647,總共可以表示 42 億個數,如果這里每天百億的數據量選用 int類型做為主鍵的話,很明顯會有大量的主鍵發生沖突,會將不重復的數據認為是發生了重復。如果選取 UUID 做為日志的主鍵,UUID 會生成 36位的字符串。 例如:"e6f9c6v6-4c6f-41c1-9d30-c47j09r1284a"。每個主鍵占用 36 字節,每天 1 百億數據,36 字節 *100億 ≈ 360GB,這僅僅是一天的數據量。 所以該 set 集合要想存儲空間不發生持續地爆炸式增長,必須增加一個功能,那就是給所有的主鍵增加過期時間ttl。如果不增加 ttl,10 天數據量的主鍵占用空間就 3.6T,100 天數據量的主鍵占用空間 36T,所以在設計之初必須考慮為主鍵設定ttl。如果要求按天進行去重或者認為日志發生重復上報的時間間隔不可能大于 24 小時,那么為了系統的可靠性 ttl 可以設置為 36 小時。每天數據量 1百億,且 set 集合中存放著 36 小時的數據量,即 100 億 * 1.5 = 150 億,所以 set 集合中需要維護 150 億的數據量。且 set 集合中每條數據都增加了 ttl,意味著 set 集合需要為每條數據再附帶保存一個時間戳,來確定該數據什么時候過期。例如 Redis 中為一個key 設置了 ttl,如果沒有為這個 key 附帶時間戳,那么根本無法判斷該 key什么時候應該被清理。所以在考慮每條數據占用空間時,不僅要考慮數據本身,還需要考慮是否需要其他附帶的存儲。主鍵本身占用 36 字節加上 long 類型的時間戳8 字節,所以每條數據至少需要占用 44 字節,150 億 * 44 字節 = 660GB。所以每天百億的數據量,如果我們使用 set集合的方案來實現,至少需要占用 660GB 以上的存儲空間
基于HBase 維護全局 set 實現去重
-
去重處理方法:HBase 基于 rowkey Get的效率比較高,所以這里可以考慮將這個大的 set 集合以 HBase rowkey 的形式存放到 HBase 中。HBase 表設置 ttl 為 36小時,最近 36 小時的 150 億條日志的主鍵都存放到 HBase 中,每來一條數據,先拿到主鍵去 HBase 中查詢,如果 HBase表中存在該主鍵,說明當前日志已經被處理過了,當前日志應該被過濾。如果 HBase表中不存在該主鍵,說明當前日志之前沒有被處理過,此時應該被處理,且處理完成后將當前主鍵 Put 到 HBase 表中。由于數據量比較大,所以一定要提前對HBase 表進行預分區,將壓力分散到各個 RegionServer 上,避免產生數據熱點問題。
-
弊端:
-
HBase 去重到底能不能保證 Exactly Once?這里用計算PV的案例來分析:
假如 PV 信息維護在 Flink 的狀態中,通過冪等性將 PV 統計結果寫入到 Redis 供其他業務方查詢實時統計的 PV 值。如下圖所示,Flink處理完日志 b 后進行 Checkpoint,將 PV = 2 和 Kafka 對應的 offset 信息保存起來,此時 HBase 表中有兩條rowkey 分別是 a、b,表示主鍵為 a 和 b 的日志已經被處理過了。接著往后處理,當處理完日志 d 以后,PV = 4,HBase 表中有 4 條 rowkey 分別是 a、b、c、d,表示主鍵為 a、b、c、d的日志已經被處理過了。但此時機器突然故障,導致 Flink 任務掛掉,如右圖所示 Flink 任務會從最近一次成功的 Checkpoint處恢復任務,從日志 b 之后的位置開始消費,且 PV 恢復為 2,因為處理完日志 b 時 PV 為 2。但由于 HBase 中的數據不是由 Flink 來維護,所以無法恢復到 Checkpoint 時的狀態。所以 Flink 任務恢復后,PV = 2 且HBase 中 rowkey 為 a、b、c、d。此時 Flink 任務從日志 c 開始繼續處理數據,當處理日志 c 和 d 時,Flink 任務會先查詢HBase,發現 HBase 中已經保存了主鍵 c 和 d,所以認為日志 c 和 d 已經被處理了,會將日志 c 和 d過濾掉,于是就產生了丟數據的現象,日志 c 和 d 其實并沒有參與 PV 的計算。
不將 PV 信息維護在 Flink 狀態中僅僅在 Redis 中保存 PV 結果,每處理一條數據,將 Redis中的 PV 值加一即可。如下圖所示,PV 不維護在狀態中,所以當處理完日志 b 進行 checkpoint 時,只會將當前消費的 offset信息維護起來。處理完日志 d 以后,由于機器故障,Flink 任務掛掉,任務依然會從日志 b 之后開始消費,此時 Redis 中保存的 PV=4,且HBase 中保存的 rowkey 信息為 a、b、c、d。緊接著開始處理 c 和 d,因為 HBase 中保存了主鍵 c、d,因此不會重復處理日志c、d,因此 PV 值計算正確,也不會出現重復消費的問題。
這種策略貌似沒有問題,但是問題百出。我們的任務處理某個元素需要兩個操作:① 將 Redis 中 PV 值加一 ② 將主鍵 id 加入到 HBase由于 Redis 和 HBase 都不支持事務,所以以上兩個操作并不能保障原子性。如果代碼中先執行步驟 ①,可能會造成 ① 執行成功 ②還未執行成功,那么恢復任務時 PV=4,HBase 中保存主鍵 a、b、c,此時日志 d 就會重復計算,就會造成 PV值計算偏高的問題。如果代碼中先執行步驟 ②,可能會造成 ② 執行成功 ① 還未執行成功,那么恢復任務時 PV=3,HBase 中保存主鍵a、b、c、d,此時日志 d 就會被漏計算,就會造成 PV 值計算偏低的問題。這里只是拿 HBase 舉例而已,上述情況中外部的任何存儲介質維護 set集合都不能保證 Exactly Once,因為 Flink 從 Checkpoint 處恢復時,外部存儲介質并不能恢復到 Checkpoint時的狀態。
終極版:使用 Flink 的 KeyedState 實現去重
既然外部存儲介質不能恢復到 Checkpoint 時的狀態,那使用 Flink 內置的狀態后端就可以完美解決。當任務從 Checkpoint處恢復時,就可以拿到 Checkpoint 時的狀態快照信息如下圖所示,可以將主鍵信息維護在 Flink 的狀態中,當處理完日志 b 時,將 PV=2和狀態中的主鍵信息:a、b 一塊保存到狀態后端。無論后續什么情況發生,只要從 chk-1 對應的 Checkpoint 處恢復,那么會將 PV=2和狀態中的主鍵信息:a、b 做為一個整體來恢復。所以就可以保障 Exactly Once 了。
-
如何使用 KeyedState 維護 set 集合?① Flink三種狀態后端中這里選用 RocksDBStateBackend 將狀態信息存儲在 TaskManager 本地的 RocksDB數據庫中,② 其次KeyedState 數據結構選用ValueStateValueState。當處理一條日志時,根據日志的主鍵 id 從 ValueState 中 get 數據,如果不為 null就認為當前處理的日志在之前已經被處理過了,此時應該被過濾;如果為 null 就認為當前日志在之前還沒有被處理過,此時應該被處理,并且需要 update一個值到 ValueState 中,來標識當前日志被處理過了
調優
優化主鍵來減少狀態大小,且提高吞吐量
將每天百億數據量的主鍵通過 hash 算法轉換為 long 類型,然后我們可以把 long 類型的數據當做主鍵來存儲。(提問為啥要選用long類型?而不是int,那么hash沖突的概率如何呢?)
設置本地 RocksDB 的數據目錄
建議在 flink-conf.yaml 中配置 state.backend.rocksdb.localdir 參數來指定 RocksDB在磁盤中的存儲目錄。當一個 TaskManager 包含 3 個 slot時,那么單個服務器上的三個并行度都對磁盤造成頻繁讀寫,從而導致三個并行度的之間相互爭搶同一個磁盤 IO,這樣務必導致三個并行度的吞吐量都會下降。慶幸的是 Flink 的 state.backend.rocksdb.localdir參數可以指定多個目錄,一般大數據所使用的服務器都會掛載很多塊硬盤,我們期望三個并行度使用不同的硬盤從而減少資源競爭
總結
以上是生活随笔為你收集整理的基于Flink百亿数据实时去重的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 乡村田园风的美食账号为何能在强者如云的美
- 下一篇: 田园农家