拆解大数据总线平台DBus的系统架构
Dbus所支持兩類數(shù)據(jù)源的實現(xiàn)原理與架構拆解。
大體來說,Dbus支持兩類數(shù)據(jù)源:
- RDBMS數(shù)據(jù)源
- 日志類數(shù)據(jù)源
一、RMDBMS類數(shù)據(jù)源的實現(xiàn)
以mysql為例子. 分為三個部分:
- 日志抽取模塊
- 增量轉換模塊
- 全量拉取模塊
1.1 日志抽取模塊(Extractor)
mysql 日志抽取模塊由兩部分構成:
- canal server:負責從mysql中抽取增量日志。
- mysql-extractor storm程序:負責將增量日志輸出到kafka中,過濾不需要的表數(shù)據(jù),保證at least one和高可用。
我們知道,雖然mysql innodb有自己的log,mysql主備同步是通過binlog來實現(xiàn)的。而binlog同步有三種模式:Row 模式,Statement 模式,Mixed模式。因為statement模式有各種限制,通常生產環(huán)境都使用row模式進行復制,使得讀取全量日志成為可能。
通常我們的mysql布局是采用 2個master主庫(vip)+ 1個slave從庫 + 1個backup容災庫 的解決方案,由于容災庫通常是用于異地容災,實時性不高也不便于部署。
為了最小化對源端產生影響,我們讀取binlog日志從slave從庫讀取。
讀取binlog的方案比較多,DBus也是站在巨人的肩膀上,對于Mysql數(shù)據(jù)源使用阿里巴巴開源的Canal來讀取增量日志。這樣做的好處是:
- 不用重復開發(fā)避免重復造輪子
- 享受canal升級帶來的好處
關于Canal的介紹可參考:https://github.com/alibaba/canal/wiki/Introduction 由于canal用戶抽取權限比較高,一般canal server節(jié)點也可以由DBA組來維護。
日志抽取模塊的主要目標是將數(shù)據(jù)從canal server中讀出,盡快落地到第一級kafka中,避免數(shù)據(jù)丟失(畢竟長時間不讀日志數(shù)據(jù),可能日志會滾到很久以前,可能會被DBA刪除),因此需要避免做過多的事情,主要就做一下數(shù)據(jù)拆包工作防止數(shù)據(jù)包過大。
從高可用角度考慮,在使用Canal抽取過程中,采用的基于zookeeper的Canal server高可用模式,不存在單點問題,日志抽取模塊extractor也使用storm程序,同樣也是高可用架構。
不同數(shù)據(jù)源有不同的日志抽取方式,比如oracle,mongo等都有相應的日志抽取程序。
DBus日志抽取模塊獨立出來是為了兼容這些不同數(shù)據(jù)源的不同實現(xiàn)方式。
1.2 增量轉換模塊(Stream)
增量數(shù)據(jù)處理模塊,根據(jù)不同的數(shù)據(jù)源類型的格式進行轉換和處理。
1)分發(fā)模塊dispatcher
- 將來自數(shù)據(jù)源的日志按照不同的schema分發(fā)到不同topic上。這樣做的目的
- 是為了數(shù)據(jù)隔離(因為一般不同的shema對應不同的數(shù)據(jù)庫)
- 是為了分離轉換模塊的計算壓力,因為轉換模塊計算量比較大,可以部署多個,每個schema一個提高效率。
2)轉換模塊appender
- 實時數(shù)據(jù)格式轉換:Canal數(shù)據(jù)是protobuf格式,需要轉換為我們約定的UMS格式,生成唯一標識符ums_id和ums_ts等;
- 捕獲元數(shù)據(jù)版本變更:比如表加減列,字段變更等,維護版本信息,發(fā)出通知觸發(fā)告警
- 實時數(shù)據(jù)脫敏:根據(jù)需要對指定列進行脫敏,例如替換為***,MD5加鹽等。
- 響應拉全量事件:當收到拉全量請求時為了保證數(shù)據(jù)的相應順序行,會暫停拉增量數(shù)據(jù),等全量數(shù)據(jù)完成后,再繼續(xù)。
- 監(jiān)控數(shù)據(jù):分發(fā)模塊和轉換模塊都會響應心跳event,統(tǒng)計每一張表在兩次心跳中的數(shù)據(jù)和延時情況,發(fā)送到statistic作為監(jiān)控數(shù)據(jù)使用。
- 分發(fā)模塊和轉換模塊都會相應相關reload通知事件從Mgr庫和zk上進行加載配置操作。
1.3 全量拉取模塊(FullPuller)
全量拉取可用于初始化加載(Initial load), 數(shù)據(jù)重新加載,實現(xiàn)上我們借鑒了sqoop的思想。將全量過程分為了2 個部分:
1)數(shù)據(jù)分片
分片讀取max,min,count等信息,根據(jù)片大小計算分片數(shù),生成分片信息保存在split topic中。下面是具體的分片策略:
以實際的經驗,對于mysql InnDB,只有使用主鍵索引進行分片,才能高效。因為mysql innDB的主鍵列與數(shù)據(jù)存儲順序一致。
2)實際拉取
每個分片代表一個小任務,由拉取轉換模塊通過多個并發(fā)度的方式連接slave從庫進行拉取。 拉取完成情況寫到zookeeper中,便于監(jiān)控。
全量拉取對源端數(shù)據(jù)庫是有一定壓力的,我們做法是:
- 從slave從庫拉取數(shù)據(jù)
- 控制并發(fā)度6~8
- 推薦在業(yè)務低峰期進行
全量拉取不是經常發(fā)生的,一般做初始化拉取一次,或者在某種情況下需要全量時可以觸發(fā)一次。
1.3 全量和增量的一致性
在整個數(shù)據(jù)傳輸中,為了盡量的保證日志消息的順序性,kafka我們使用的是1個partition的方式。在一般情況下,基本上是順序的和唯一的。 但如果出現(xiàn)寫kafka異步寫入部分失敗, storm也用重做機制,因此,我們并不嚴格保證exactly once和完全的順序性,但保證的是at least once。
因此ums_id_變得尤為重要。 對于全量抽取,ums_id是一個值,該值為全量拉取event的ums_id號,表示該批次的所有數(shù)據(jù)是一批的,因為數(shù)據(jù)都是不同的可以共享一個ums_id_號。ums_uid_流水號從zk中生成,保證了數(shù)據(jù)的唯一性。 對于增量抽取,我們使用的是 mysql的日志文件號 + 日志偏移量作為唯一id。Id作為64位的long整數(shù),高6位用于日志文件號,低13位作為日志偏移量。 例如:000103000012345678。 103 是日志文件號,12345678 是日志偏移量。 這樣,從日志層面保證了物理唯一性(即便重做也這個id號也不變),同時也保證了順序性(還能定位日志)。通過比較ums_id_就能知道哪條消息更新。
ums_ts_的價值在于從時間維度上可以準確知道event發(fā)生的時間。比如:如果想得到一個某時刻的快照數(shù)據(jù)。可以通過ums_ts 來知道截斷時間點。
二、日志類數(shù)據(jù)源的實現(xiàn)
業(yè)界日志收集、結構化、分析工具方案很多,例如:Logstash、Filebeat、Flume、Fluentd、Chukwa. scribe、Splunk等,各有所長。在結構化日志這個方面,大多采用配置正則表達式模板:用于提取日志中模式比較固定、通用的部分,例如日志時間、日志類型、行號等。對于真正的和業(yè)務比較相關的信息,這邊部分是最重要的,稱為message部分,我們希望使用可視化的方式來進行結構化。
例如:對于下面所示的類log4j的日志:
如果用戶想將上述數(shù)據(jù)轉換為如下的結構化數(shù)據(jù)信息:
我們稱這樣的日志為“數(shù)據(jù)日志”
DBUS設計的數(shù)據(jù)日志同步方案如下:
- 日志抓取端采用業(yè)界流行的組件(例如Logstash、Flume、Filebeat等)。一方面便于用戶和業(yè)界統(tǒng)一標準,方便用戶的整合;另一方面也避免無謂的重造輪子。抓取數(shù)據(jù)稱為原始數(shù)據(jù)日志(raw data log)放進Kafka中,等待處理。
- 提供可視化界面,配置規(guī)則來結構化日志。用戶可配置日志來源和目標。同一個日志來源可以輸出到多個目標。每一條“日志源-目標”線,中間數(shù)據(jù)經過的規(guī)則處理用戶根據(jù)自己的需求來自由定義。最終輸出的數(shù)據(jù)是結構化的,即:有schema約束,可以理解為類似數(shù)據(jù)庫中的表。
- 所謂規(guī)則,在DBUS中,即“規(guī)則算子”。DBUS設計了豐富易用的過濾、拆分、合并、替換等算子供用戶使用。用戶對數(shù)據(jù)的處理可分多個步驟進行,每個步驟的數(shù)據(jù)處理結果可即時查看、驗證;可重復使用不同算子,直到轉換、裁剪得到自己需要的數(shù)據(jù)。
- 將配置好的規(guī)則算子組運用到執(zhí)行引擎中,對目標日志數(shù)據(jù)進行預處理,形成結構化數(shù)據(jù),輸出到Kafka,供下游數(shù)據(jù)使用方使用。
系統(tǒng)流程圖如下所示:
根據(jù)配置,我們支持同一條原始日志,能提取為一個表數(shù)據(jù),或者可以提取為多個表數(shù)據(jù)。
每個表是結構化的,滿足相同的schema。
- 每個表是一個規(guī)則 算子組的合集,可以配置1個到多個規(guī)則算子組
- 每個規(guī)則算子組,由一組規(guī)則算子組合而成
拿到一條原始數(shù)據(jù)日志, 它最終應該屬于哪張表呢?
每條日志需要與規(guī)則算子組進行匹配:
- 符合條件的進入規(guī)則算子組的,最終被規(guī)則組轉換為結構化的表數(shù)據(jù)。
- 不符合的嘗試下一個規(guī)則算子組。
- 都不符合的,進入unknown_table表。
2.1 規(guī)則算子
規(guī)則算子是對數(shù)據(jù)進行過濾、加工、轉換的基本單元。常見的規(guī)則算子如下:
算子之間是獨立的,通過組合不同的算子達到更復雜的功能,對算子進行迭代使用最終達到對任意數(shù)據(jù)進行加工的目的。
我們試圖使得算子盡量滿足正交性或易用性(雖然正則表達式很強大,但我們仍然開發(fā)一些簡單算子例如trim算子來完成簡單功能,以滿足易用性)。
三、UMS統(tǒng)一消息格式
無論是增量、全量還是日志,最終輸出到結果kafka中的消息都是我們約定的統(tǒng)一消息格式,稱為UMS(unified message schema)格式。如下圖所示:
3.1 Protocol
數(shù)據(jù)的類型,被UMS的版本號
3.2 schema
1)namespace 由:類型. 數(shù)據(jù)源名.schema名 .表名.表版本號. 分庫號 .分表號 組成,能夠描述所有表。
例如:mysql.db1.schema1.testtable.5.0.0
2)fields是字段名描述。
- ums_id_ 消息的唯一id,保證消息是唯一的
- ums_ts_ canal捕獲事件的時間戳;
- ums_op_ 表明數(shù)據(jù)的類型是I (insert),U (update),B (before Update),D(delete)
- ums_uid_ 數(shù)據(jù)流水號,唯一值
3)payload是指具體的數(shù)據(jù)。
一個json包里面可以包含1條至多條數(shù)據(jù),提高數(shù)據(jù)的有效載荷。
四、心跳監(jiān)控和預警
RDBMS類系統(tǒng)涉及到數(shù)據(jù)庫的主備同步,日志抽取,增量轉換等多個模塊等。
日志類系統(tǒng)涉及到日志抽取端,日志轉換模模塊等。
如何知道系統(tǒng)正在健康工作,數(shù)據(jù)是否能夠實時流轉? 因此對流程的監(jiān)控和預警就尤為重要。
4.1 對于RDBMS類系統(tǒng)
心跳模塊從dbusmgr庫中獲得需要監(jiān)控的表列表,以固定頻率(比如每分鐘)向源端dbus庫的心跳表插入心跳數(shù)據(jù)(該數(shù)據(jù)中帶有發(fā)送時間),該心跳表也作為增量數(shù)據(jù)被實時同步出來,并且與被同步表走相同的邏輯和線程(為了保證順序性,當遇到多并發(fā)度時是sharding by table的,心跳數(shù)據(jù)與table數(shù)據(jù)走同樣的bolt),這樣當收到心跳數(shù)據(jù)時,即便沒有任何增刪改的數(shù)據(jù),也能證明整條鏈路是通的。
增量轉換模塊和心跳模塊在收到心跳包數(shù)據(jù)后,就會發(fā)送該數(shù)據(jù)到influxdb中作為監(jiān)控數(shù)據(jù),通過grafana進行展示。 心跳模塊還會監(jiān)控延時情況,根據(jù)延時情況給以報警。
4.2 對于日志類系統(tǒng)
從源端就會自動產生心跳包,類似RDBMS系統(tǒng),將心跳包通過抽取模塊,和算子轉換模塊同步到末端,由心跳模塊負責監(jiān)控和預警。
轉載于:https://www.cnblogs.com/yixinjishu/p/11540000.html
總結
以上是生活随笔為你收集整理的拆解大数据总线平台DBus的系统架构的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 大厂智力题讲解,学它!!!!(一)
- 下一篇: 管理软件软件开发_管理在软件开发中的作用