魅族大数据之流平台设计部署实践--转
原文地址:http://mp.weixin.qq.com/s/-RZB0gCj0gCRUq09EMx1fA
沈輝煌 ??魅族數(shù)據(jù)架構(gòu)師?
2010年加入魅族,負(fù)責(zé)大數(shù)據(jù)、云服務(wù)相關(guān)設(shè)計(jì)與研發(fā);
專注于分布式服務(wù)、分布式存儲(chǔ)、海量數(shù)據(jù)下rdb與nosql融合等技術(shù)。
主要技術(shù)點(diǎn):推薦算法、文本處理、ranking算法
?
本篇文章內(nèi)容來自第八期魅族開放日魅族數(shù)據(jù)架構(gòu)師沈輝煌的現(xiàn)場分享,由IT大咖說提供現(xiàn)場速錄,由msup整理編輯。
?
導(dǎo)讀:魅族大數(shù)據(jù)的流平臺(tái)系統(tǒng)擁有自設(shè)計(jì)的采集SDK,自設(shè)計(jì)支持多種數(shù)據(jù)源采集的Agent組件,還結(jié)合了Flume、Spark、Metaq、Storm、Kafka、Hadoop等技術(shù)組件,本文就魅族流平臺(tái)對(duì)大量數(shù)據(jù)的采集、實(shí)時(shí)計(jì)算、系統(tǒng)分析方法,全球多機(jī)房數(shù)據(jù)采集等問題進(jìn)行介紹。
?
流平臺(tái)是魅族大數(shù)據(jù)平臺(tái)的重要部分,包括數(shù)據(jù)采集、數(shù)據(jù)處理、數(shù)據(jù)存儲(chǔ)、數(shù)據(jù)計(jì)算等模塊,流平臺(tái)為大數(shù)據(jù)提供了強(qiáng)大的支撐能力。
?
文章還介紹了魅族大數(shù)據(jù)流平臺(tái)的架構(gòu)、設(shè)計(jì)方式、常用組件、核心技術(shù)框架等方面的內(nèi)容,還原魅族大數(shù)據(jù)平臺(tái)的搭建過程及遇到的問題。
?
一、魅族大數(shù)據(jù)平臺(tái)架構(gòu)
?
如圖所示便是魅族的大數(shù)據(jù)平臺(tái)架構(gòu)。
?
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?
-
左邊是多樣性的數(shù)據(jù)源接入;
-
右上是離線數(shù)據(jù)的采集;
-
下面是流平臺(tái)(也是今天分享的主角);
-
中間是集群的部署;
-
右邊是ETL的數(shù)據(jù)挖掘、算法庫和一些數(shù)據(jù)模型;
-
左上角是數(shù)據(jù)開發(fā)平臺(tái),比如webIDE可以使得開發(fā)人員更便捷地做一些數(shù)據(jù)查詢和管理;
-
最右邊的是一個(gè)數(shù)據(jù)產(chǎn)品門戶,包括我們的用戶畫像、統(tǒng)計(jì)系統(tǒng)等,這里面包含大數(shù)據(jù)的很多組件,比如數(shù)據(jù)采集、數(shù)據(jù)處理、數(shù)據(jù)存儲(chǔ)、數(shù)據(jù)挖掘等,最后產(chǎn)生大數(shù)據(jù)的雛形。
?
二、流平臺(tái)介紹
?
?
流平臺(tái)是大數(shù)據(jù)平臺(tái)一個(gè)比較重要的部分,主要包括四個(gè)部分:數(shù)據(jù)采集、數(shù)據(jù)處理、數(shù)據(jù)存儲(chǔ)、計(jì)算能力。
?
-
數(shù)據(jù)采集
?
?“誰擁有了整個(gè)世界的數(shù)據(jù),他就是最大的贏家”,這句話雖然有點(diǎn)夸張,但是卻表達(dá)了數(shù)據(jù)采集的重要性。一個(gè)大數(shù)據(jù)平臺(tái)數(shù)據(jù)的多樣性、數(shù)據(jù)量的級(jí)別很大程度上決定了大數(shù)據(jù)的能力和豐富程度。
?
-
數(shù)據(jù)處理
?
這里講的數(shù)據(jù)處理并不是像末端那么專業(yè)的數(shù)據(jù)清洗,更多的是為后續(xù)入庫做一些簡單處理,以及實(shí)時(shí)計(jì)算。
?
-
數(shù)據(jù)存儲(chǔ)
計(jì)算能力,包括離線計(jì)算和實(shí)時(shí)計(jì)算
?
流平臺(tái)為大數(shù)據(jù)提供非常強(qiáng)大的支撐,數(shù)據(jù)統(tǒng)計(jì)分析、數(shù)據(jù)挖掘、神經(jīng)網(wǎng)絡(luò)的圖形計(jì)算等都可以依靠計(jì)算能力進(jìn)行。
?
實(shí)時(shí)計(jì)算是指在一定單位的時(shí)間延遲范圍內(nèi),基于增量的數(shù)據(jù)推算出結(jié)果,再結(jié)合歷史數(shù)據(jù)得到期望的分析結(jié)果。這個(gè)時(shí)間是根據(jù)業(yè)務(wù)需求而定。
?
1、流平臺(tái)架構(gòu)
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ??
上圖是我們的流平臺(tái)架構(gòu)圖
?
-
左邊是數(shù)據(jù)源,像NoSQL、RDB、文件類型;
-
最右邊是集群,下面還有其他的一些Hadoop(存儲(chǔ));
-
中間的框是核心,也就是流平臺(tái);
-
最上面的是AS-Manager(我們的流管理平臺(tái)),承載了非常多的管理功能;
-
下面是Zookeeper,這是一個(gè)非常流行的集成管理中心,魅族的一些架構(gòu)都會(huì)用到它,流平臺(tái)也不例外,Zookeeper可以說貫穿了我們整個(gè)流平臺(tái)的架構(gòu);
-
最下面是AS-Protocol,我們自己設(shè)計(jì)的流平臺(tái)的數(shù)據(jù)對(duì)象協(xié)議,打通了整個(gè)流平臺(tái)的數(shù)據(jù)鏈路;
-
中間四個(gè)框是核心的四個(gè)模塊:采集模塊、數(shù)據(jù)中轉(zhuǎn)模塊、緩存模塊、實(shí)時(shí)計(jì)算模塊,也叫合并層。
?
2、具體架構(gòu)介紹
?
這是我們的具體架構(gòu)圖。
?
業(yè)務(wù)規(guī)模:從這邊采集數(shù)據(jù)到經(jīng)過流平臺(tái)最后經(jīng)過實(shí)時(shí)計(jì)算或入庫,它的數(shù)據(jù)量量級(jí)在千億級(jí)別。
?
3、組件
?
-
數(shù)據(jù)源渠道
?
前面提到采集數(shù)據(jù)源渠道的多樣性決定了大數(shù)據(jù)平臺(tái)的相應(yīng)能力和綜合程度。我們這邊首先會(huì)有一個(gè)文件類的業(yè)務(wù)數(shù)據(jù),包括業(yè)務(wù)日志、業(yè)務(wù)數(shù)據(jù)、數(shù)據(jù)庫文件,這些都會(huì)經(jīng)過采集服務(wù)采集。
?
下面這一塊包括一些網(wǎng)站的js訪問、手機(jī)各APP埋點(diǎn)、特點(diǎn)的應(yīng)用日志文件(它會(huì)通過手機(jī)端的一些埋點(diǎn)上訪到我們的埋點(diǎn)服務(wù))。
?
-
數(shù)據(jù)采集
?
數(shù)據(jù)采集分為兩個(gè)部分:采集服務(wù)、獨(dú)立部署的埋點(diǎn)服務(wù)。圖中只顯示了一個(gè)埋點(diǎn)服務(wù),里面還會(huì)有很多的第三方業(yè)務(wù),第三方業(yè)務(wù)通過這個(gè)紅色的插件接入我們的采集。
?
-
數(shù)據(jù)中轉(zhuǎn)
?
通過采集模塊把數(shù)據(jù)流轉(zhuǎn)到中轉(zhuǎn)模塊,中轉(zhuǎn)模塊采用的是目前比較流行的flume組件,紅色sink是我們自己開發(fā)的。
?
-
Cache
?
sink把前面的數(shù)據(jù)轉(zhuǎn)給緩存層,緩存層里有metaq和Kafka。
?
-
Streaming
?
實(shí)時(shí)計(jì)算模塊上線了Spark和Storm,較早上線的是Spark,目前兩個(gè)都在用的原因是它會(huì)適應(yīng)不同的業(yè)務(wù)場景。
?
-
Store
?
最后面是我們提供給落地的store層,像HIVE、Hbase等等。
?
-
流管理平臺(tái)
?
最下面是流管理平臺(tái),圖中有四條線連著四個(gè)核心模塊,對(duì)這四個(gè)模塊進(jìn)行非常重要且非常豐富的邏輯管理,包括數(shù)據(jù)管理、對(duì)各節(jié)點(diǎn)的監(jiān)控、治理、實(shí)時(shí)命令的下發(fā)等。
?
?
三、流平臺(tái)設(shè)計(jì)
?
1、概念解讀
?
Message,就是一條消息,是最小的數(shù)據(jù)單位。業(yè)務(wù)方給的一條數(shù)據(jù)就是一個(gè)message;我們?nèi)ゲ杉募脑?#xff0c;一行數(shù)據(jù)就是一個(gè)message。
?
AS-Protocol,是我們自己設(shè)計(jì)的流平臺(tái)數(shù)據(jù)的對(duì)象,它會(huì)對(duì)一批量的message進(jìn)行打包,然后再加上一些必要的變量做一個(gè)封裝。
?
Evnet,會(huì)提供一個(gè)類似的標(biāo)準(zhǔn)接口,這個(gè)地方其實(shí)更多的是為了打通采集的流平臺(tái)。它最重要的一個(gè)變量是Topic,就是說我拿到了我的AS-Protocol就可以根據(jù)對(duì)應(yīng)的Topic發(fā)到相應(yīng)的登錄去緩存提取,因?yàn)槲覀兊腁S-Protocol除了起始端和結(jié)束端以外,中間層是不用解析協(xié)議的。
?
Type,數(shù)據(jù)格式目前是Json和Hive格式,可以根據(jù)業(yè)務(wù)去擴(kuò)展。
?
Compress,Hive格式在空間上也是非常有優(yōu)勢(shì)的,非常適合于網(wǎng)絡(luò)傳輸壓縮。當(dāng)壓縮數(shù)據(jù)源質(zhì)量沒有達(dá)到一定量的程度的時(shí)候會(huì)越壓越大,所以我們要判斷是否需要壓縮。我們壓縮采用的是一個(gè)全系統(tǒng)
Data_timestamp,數(shù)據(jù)的時(shí)間是最上面的message,每一個(gè)message會(huì)攜帶一個(gè)數(shù)據(jù)時(shí)間.這個(gè)比較好理解,就是入庫之后會(huì)用做數(shù)據(jù)統(tǒng)計(jì)和分析的。
?
Send_timestamp,發(fā)送時(shí)間會(huì)攜帶在我們的AS-Protocol里,它聲明了每一個(gè)數(shù)據(jù)包發(fā)送的時(shí)間。
?
Unique Key,每一個(gè)數(shù)據(jù)包都有一個(gè)唯一的標(biāo)識(shí),這個(gè)也是非常重要的,它會(huì)跟著AS-Protocol和Event走通整個(gè)平臺(tái)的數(shù)據(jù)鏈路,在做數(shù)據(jù)定位、問題定位的時(shí)候非常有用,可以明確查到每個(gè)數(shù)據(jù)包在哪個(gè)鏈路經(jīng)歷了什么事情。
?
Topic。這個(gè)不需多言。
?
Data_Group,數(shù)據(jù)分組是我們非常核心的一個(gè)設(shè)計(jì)思想,原則上我們是一個(gè)業(yè)務(wù)對(duì)應(yīng)一個(gè)數(shù)據(jù)分組。
?
Protobuf序列化,我們會(huì)對(duì)Event數(shù)據(jù)做一個(gè)PT序列化,然后再往上面?zhèn)?#xff0c;這是為了節(jié)省數(shù)據(jù)流量。
?
2、協(xié)議設(shè)計(jì)
?
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?
如圖所示為Event、As-Protocol和Message的關(guān)系。
?
最上層是Event,里面有一個(gè)Unique Key和Topic包括了我們的As-Protocol,然后是數(shù)據(jù)格式、發(fā)動(dòng)時(shí)間是否壓縮、用什么方式壓縮,還攜帶一些額外的變量。最后面是一個(gè)Body,Body其實(shí)就是一個(gè)message的宿主,以字節(jié)流的方式存儲(chǔ)。這個(gè)就是我們一個(gè)數(shù)據(jù)對(duì)象的協(xié)議設(shè)計(jì)。
接下來看數(shù)據(jù)在整個(gè)架構(gòu)里是如何流轉(zhuǎn)和傳輸?shù)摹?/p>
?
首先是數(shù)據(jù)源渠道,最左邊的是message,任何業(yè)務(wù)方的數(shù)據(jù)過來都是一條message,經(jīng)過數(shù)據(jù)采集把一批message打包封裝成Event,再發(fā)給數(shù)據(jù)中轉(zhuǎn)模塊,也叫flume。把Event拆出來,有一個(gè)topic,最后把As-protocol放到相應(yīng)位置緩存,消費(fèi)對(duì)應(yīng)的Topic,拿到對(duì)應(yīng)的As-Protocol,并把這個(gè)數(shù)據(jù)包解析出來,得到一條一條的message,這時(shí)就可以進(jìn)行處理、入庫或?qū)崟r(shí)計(jì)算。
?
需要特別注意的是message和Event。每個(gè)Message的業(yè)務(wù)量級(jí)是不一樣的,有幾十B、幾百B、幾千B的差別,打包成As-Protocol的時(shí)候要試試批量的數(shù)目有多少,原則上壓縮后的數(shù)據(jù)有個(gè)建議值,這個(gè)建議值視業(yè)務(wù)而定,DataGroup打包的數(shù)量是可以配的。
?
3、數(shù)據(jù)分組設(shè)計(jì)
?
?
如圖所示是我們的DataGroup設(shè)計(jì)。首先看最上面,一個(gè)Topic可以定義N個(gè)DataGroup。往下是Topic和streaming Job一比一的關(guān)系,就是說一個(gè)實(shí)時(shí)的Group只需要對(duì)應(yīng)一個(gè)Topic,如果兩個(gè)業(yè)務(wù)不相關(guān)就對(duì)應(yīng)的兩個(gè)Topic,用兩個(gè)Job去處理,最后得到想要的關(guān)系。
?
從架構(gòu)圖可以看到DataGroup的扭轉(zhuǎn)關(guān)系。最初數(shù)據(jù)采集每一個(gè)節(jié)點(diǎn)會(huì)聲明它是屬于哪一個(gè)DataGroup,上傳數(shù)據(jù)會(huì)處于這個(gè)DataGroup,經(jīng)過數(shù)據(jù)中轉(zhuǎn)發(fā)給我們的分布式緩存也對(duì)應(yīng)了Topic下面不同的分組數(shù)據(jù)。最后Streaming交給我Topic,我可以帥選出在最上面的關(guān)系,去配置DataGroup,可以非常靈活地組合。這就是DataGroup的設(shè)計(jì)思想。
?
?
四、采集組件Agent
?
?
1、概述
?
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ??
如圖所示,這是完全由我們自己設(shè)計(jì)和實(shí)現(xiàn)的一款組件。右邊是采集組件,分為兩部分:一個(gè)是基于java環(huán)境的獨(dú)立工作程序;另一個(gè)是jar插件。插件叫Agen-Stub.jar;獨(dú)立層是Agent-File.zip,Agent-File有一個(gè)paresr支持不同的文件類型,目前支持的file和Binlog,可擴(kuò)展。根據(jù)需要可以增加parser,也是接入Agent-stub,擁有Agent-stub的一些特性。
?
如上圖右側(cè)的示意圖,Agent-stub接入多個(gè)Business,前面提到的一個(gè)埋點(diǎn)服務(wù)就是一個(gè)Business,它把數(shù)據(jù)交給Agent-stub,Agent-stub會(huì)往后發(fā)展,與file和mysQL相對(duì)應(yīng)的是file parser,出來是Agent-stub,流程是一樣的。
?
2、Agent-Stub.jar
?
接下來看Agent-Stub是如何設(shè)計(jì)的。
?
多線程、異步。這個(gè)毫無疑問,做插件化肯定是這樣考慮的,不能阻塞上層業(yè)務(wù)。
?
內(nèi)存小隊(duì)列+磁盤壓縮隊(duì)列。這是我們改進(jìn)最大的一個(gè)地方,早期版本中我們采用的是內(nèi)存大隊(duì)列,如果只有內(nèi)存大隊(duì)列缺點(diǎn)非常明顯:
?
程序正常啟動(dòng)的時(shí)候大隊(duì)列里的數(shù)據(jù)怎么辦?要等他發(fā)完嗎?還是不發(fā)完?當(dāng)大隊(duì)列塞滿的時(shí)候,還有對(duì)上層業(yè)務(wù)的侵入性怎么辦?程序遇到問題時(shí)怎么辦?大隊(duì)列可能是50萬、100萬甚至更多。
?
采用了內(nèi)存小隊(duì)列+磁盤壓縮隊(duì)列后可以解決正常程序的啟停,保證數(shù)據(jù)沒有問題,還可以解決空間的占用清空性的問題,以此同時(shí),磁盤壓縮隊(duì)列還可以在程序出錯(cuò)的時(shí)候加速發(fā)送。
?
解釋一下磁盤壓縮隊(duì)列, 這次我們?cè)O(shè)計(jì)協(xié)議的思想很簡單:壓縮之后得到一個(gè)字節(jié)速度,存在磁盤的文件里,這個(gè)文件按照小時(shí)存儲(chǔ),這時(shí)對(duì)于二次發(fā)送帶來的損耗并不大,不需要重新阻斷數(shù)據(jù)也不需要解析和壓縮,只需要讀出來發(fā)出去。后面還有一個(gè)提升就是磁盤發(fā)送隊(duì)列跟內(nèi)存發(fā)送隊(duì)列是單獨(dú)分開的,這樣更能提升二次數(shù)據(jù)的發(fā)送性能。
?
無損啟停。正常的啟動(dòng)和停止,數(shù)據(jù)是不會(huì)停止不會(huì)丟失的。
?
Agent的版本號(hào)自動(dòng)上報(bào)平臺(tái)。這個(gè)非常重要,我們?cè)缙诘陌姹臼菦]有的,可以想象一下當(dāng)你的Agent節(jié)點(diǎn)是幾千上萬,如果沒有一個(gè)平臺(tái)直觀地管理,那將是一個(gè)怎樣恐怖的局面。現(xiàn)在我們每一個(gè)Agent啟動(dòng)的時(shí)候都會(huì)創(chuàng)建一個(gè)node path,把版本號(hào)放到path里,在管理平臺(tái)解析這個(gè)path,然后做分類,我們的版本就是這樣上報(bào)的。
?
自動(dòng)識(shí)別接入源,智能歸類。這個(gè)其實(shí)和上面那點(diǎn)是一樣的,在早期版本中我們做一個(gè)Agent的標(biāo)識(shí),其實(shí)就是一個(gè)IP+一個(gè)POD,就是說你有幾千個(gè)IP+POD量表需要人工管理,工作量非常大且乏味。我們優(yōu)化了一個(gè)自動(dòng)識(shí)別,把DataGroup放到Agent的node path里,管理平臺(tái)可以做到自動(dòng)識(shí)別。
?
Agent的全面實(shí)時(shí)監(jiān)控。包括內(nèi)存隊(duì)列數(shù)、磁盤隊(duì)列數(shù)、運(yùn)行狀態(tài)、出錯(cuò)狀態(tài)、qps等,都可以Agent上報(bào),并且在管理平臺(tái)直觀地看到哪一個(gè)節(jié)點(diǎn)是什么樣子的。其做法也依賴于zookeeper的實(shí)現(xiàn)和承載,這里其實(shí)就是對(duì)zk node的應(yīng)用,我們有一個(gè)定時(shí)線程收集當(dāng)前Agent必要的數(shù)據(jù),然后傳到node的data上去,管理平臺(tái)會(huì)獲取這些date,最后做一個(gè)平臺(tái)化的展示。
?
支持實(shí)時(shí)命令。包括括限流,恢復(fù)限流、停止、調(diào)整心跳值等,大大提高了運(yùn)維能力。其實(shí)現(xiàn)原理也是依賴于Agent,這里我們創(chuàng)建一個(gè)Data Group,通過管理平臺(tái)操作之后把數(shù)據(jù)放到Data Group里,然后會(huì)有一個(gè)監(jiān)聽者去監(jiān)聽獲取數(shù)據(jù)的變化并作出相應(yīng)的邏輯。
?
兼容Docker。目前魅族在用Doker,Doker對(duì)我們這邊的Agent來講是一個(gè)挑戰(zhàn),它的啟動(dòng)和停止是非常態(tài)化的,就是你可能認(rèn)為相同的Docker容器不會(huì)重啟第二次。
?
3、Agent-File.zip???
?
接入Agent-Stub。Agent-file首先是接入Agent-stub,擁有Agent-stub的一些特性。
?
兼容Docker。因?yàn)閱?dòng)和停止的常態(tài),假設(shè)我們剛剛一個(gè)業(yè)務(wù)接入了Agent-stub,那停止的時(shí)候它會(huì)通知我,Agent-stub會(huì)把小隊(duì)列里的數(shù)據(jù)抓到磁盤壓縮隊(duì)列里去。但是這里需要注意的是:磁盤壓縮隊(duì)列不能放到Docker自己的文件系統(tǒng)里,不然它停了之后數(shù)據(jù)就沒有人能夠得到了。
?
當(dāng)Agent-stub停的時(shí)候,會(huì)有一個(gè)標(biāo)識(shí)說磁盤要做隊(duì)列,我們的數(shù)據(jù)有沒有發(fā)完,磁盤壓縮隊(duì)列里有一個(gè)評(píng)級(jí)的標(biāo)識(shí)文件,這時(shí)要用到Agent-file,Agent-file有一個(gè)單獨(dú)的掃描線程一個(gè)個(gè)地去掃描Docker目錄,掃到這個(gè)文件的時(shí)候判斷其數(shù)據(jù)有沒有發(fā)完,如果沒發(fā)完就只能當(dāng)做一個(gè)發(fā)送者。
?
支持重發(fā)歷史數(shù)據(jù)。做大數(shù)據(jù)的可能都知道這些名詞,比如昨天的數(shù)據(jù)已經(jīng)采集完了,但由于某些原因有可能數(shù)據(jù)有遺漏,需要再跑一次后端的補(bǔ)貼邏輯,或者上馬訓(xùn)練,這時(shí)就要做數(shù)據(jù)重發(fā)。我們?cè)诠芾砥脚_(tái)上就會(huì)有一個(gè)支持這種特定文件或特定時(shí)間段的選擇,Agent接收到這個(gè)命令的時(shí)候會(huì)把相應(yīng)的數(shù)據(jù)發(fā)上去,當(dāng)然前提是數(shù)據(jù)不要被清了。
?
管理平臺(tái)自助升級(jí)。這個(gè)可以理解成軟件升級(jí),Agent可以說是非常常見的組件,但是我們重新設(shè)計(jì)時(shí)把自動(dòng)升級(jí)考慮在內(nèi),這也是我們?yōu)槭裁丛O(shè)計(jì)自己做而不是用開源的組件。這樣做帶來的好處是非常大的,我們幾千個(gè)Agent在平臺(tái)里只需要一鍵就可以完成自動(dòng)升級(jí)。
?
文件名正則表達(dá)式匹配。文件名的掃描是用自動(dòng)表達(dá)式。
?
源目錄定時(shí)掃描 and Jnotify。重點(diǎn)介紹文件掃描機(jī)制。早期的版本是基于Agent-fire和KO-F兩者結(jié)合做的數(shù)據(jù)采集:Agent-file是加碼里對(duì)文件變更的事件鑒定,包括重命名、刪除、創(chuàng)建都有一個(gè)事件產(chǎn)生;KO-F是拿到文件下的最佳數(shù)據(jù)。假設(shè)源目錄里有一千個(gè)文件,KO-F現(xiàn)場就是一千個(gè),Agent-file對(duì)應(yīng)的文件變革賦予的追加、重命名等都可能會(huì)產(chǎn)生一系列事件,邏輯復(fù)雜。
?
所以我們?cè)O(shè)計(jì)了源目錄定時(shí)掃描的機(jī)制,首先有一個(gè)目標(biāo),就是我們的文件隊(duì)列,包括為未讀文件、已讀文件做區(qū)別,區(qū)別之后掃描,當(dāng)然還會(huì)有像文件摘要等的存在這里不細(xì)講,掃描之后更新未讀文件、已讀文件列表。
?
之所以加Jnotify是因?yàn)槲覀儼l(fā)現(xiàn)只用定制掃描不能解決所有業(yè)務(wù)場景的問題,jootify在這里起到補(bǔ)充定制掃描的作用,解決文件風(fēng)險(xiǎn)和文件產(chǎn)程的問題。
?
單文件讀取。早期版本中這一點(diǎn)依賴于文件列表,當(dāng)文件非常多時(shí)程序變得非常不穩(wěn)定,因?yàn)榭赡芤_幾百個(gè)或幾千個(gè)線程。后來我們改成了單文件的讀取,上文提到的掃描機(jī)制會(huì)產(chǎn)生一個(gè)文件隊(duì)列,然后從文件隊(duì)列里讀取,這樣一個(gè)個(gè)文件、一段段圖,程序就非常穩(wěn)定了。
?
文件方式存儲(chǔ)offset,無損啟停。早期采用切入式PTE做存儲(chǔ),銜接非常重,后來我們改成文件方式存儲(chǔ),設(shè)計(jì)非常簡單就只有兩個(gè)文件:一個(gè)是目錄下面所有文件的offset;一個(gè)是正在讀的文件的offset。這里涉及到無損啟停和策略的問題,我們定了一個(gè)5次算法:就是每讀了5次就會(huì)刷盤一次,但只刷在讀文件,別的文件不會(huì)變化,所以可以想象得到,當(dāng)這個(gè)程序被替換走的時(shí)候,最多也就是重復(fù)5條數(shù)據(jù),大會(huì)導(dǎo)致數(shù)據(jù)丟失。
?
4、Agent示意圖
?
?
如圖是Agent示意圖。上面是Agent-file和數(shù)據(jù)對(duì)象。Agent啟動(dòng)的時(shí)候要把里面的offset文件取來,就會(huì)產(chǎn)生未讀文件和已讀文件列表,掃描文件目錄,然后更新文件隊(duì)列,還有一個(gè)fileJNotify是相對(duì)應(yīng)的文件隊(duì)列。然后有一個(gè)比較重要的fileReader,我會(huì)先從文件隊(duì)列里拿到再去讀實(shí)際文件,讀完刷盤之后這一塊就成功了,我會(huì)根據(jù)我的刷盤去刷新offset。
?
上圖左邊有一個(gè)業(yè)務(wù)加了一個(gè)Agent-stub,最后變成flume,這里有一個(gè)QueueReceiver(隊(duì)列接收者),filereader和業(yè)務(wù)方的DataSender會(huì)把message發(fā)過來,QueueReceiver接受的數(shù)據(jù)就是一條條的message,然后發(fā)送到內(nèi)存小隊(duì)列里,當(dāng)這邊的小隊(duì)列滿了怎么辦呢?中間有一個(gè)額外的固定大小的性能提升的地方用于message歸類,當(dāng)這個(gè)fIieReader往這個(gè)內(nèi)存小隊(duì)列發(fā)的時(shí)候發(fā)現(xiàn)塞不進(jìn)去了,就會(huì)在規(guī)定大小的隊(duì)列里發(fā),當(dāng)一個(gè)固定大小的隊(duì)列滿了之后就會(huì)打包壓縮,以字節(jié)處理的方式存到磁盤壓縮隊(duì)列。
?
再來說說我們?yōu)槭裁磿?huì)提出二次數(shù)據(jù)的發(fā)送,其實(shí)就是多了一個(gè)countsender即壓縮隊(duì)列的發(fā)送者,直接的數(shù)據(jù)來源是磁盤壓縮隊(duì)列,與上面的并生沒有任何沖突。Countsender的數(shù)據(jù)對(duì)賬功能是我們整個(gè)平臺(tái)的核心功能之一,基于這個(gè)統(tǒng)計(jì)的數(shù)據(jù)確保了其完整性,少一條數(shù)據(jù)我們都知道,在采集層有一個(gè)countsender,以另外一個(gè)渠道發(fā)出去,和真正的數(shù)據(jù)源渠道不一樣,會(huì)更加的輕量化更加可靠,且數(shù)值非常小。????
??
最后是前文提到的監(jiān)控和命令的實(shí)現(xiàn),一邊是Agentnode,一邊是數(shù)據(jù)管理。
?
5、Agent的坑
?
丟數(shù)據(jù)。如前文提到內(nèi)存大隊(duì)列帶來的問題。
?
版本管理的問題。
?
tailf -f的問題。
?
網(wǎng)絡(luò)原因?qū)е聑k刪節(jié)點(diǎn)問題。網(wǎng)絡(luò)不穩(wěn)定的時(shí)候,ZK會(huì)有一個(gè)節(jié)點(diǎn)的心跳檢測(cè),不穩(wěn)定的時(shí)候監(jiān)測(cè)會(huì)以為節(jié)點(diǎn)已經(jīng)不存在了而把節(jié)點(diǎn)刪掉,這會(huì)導(dǎo)致管理平臺(tái)的節(jié)點(diǎn)監(jiān)控、文件下發(fā)全部都失效。解決辦法就是在message加一層控制檢查線程,發(fā)現(xiàn)節(jié)點(diǎn)不在了再創(chuàng)建一遍。
?
亂碼的問題。可能會(huì)跟一些遠(yuǎn)程訪問的軟件相關(guān),原則上我們假設(shè)第二次啟動(dòng)的時(shí)候沒有配置我們的編碼,默認(rèn)與系統(tǒng)一致,但當(dāng)遠(yuǎn)程軟件啟動(dòng)的時(shí)候可能會(huì)發(fā)生不一樣的地方,所以不要依賴于默認(rèn)值,一定要在啟動(dòng)程序里設(shè)置希望的編碼。
?
日志問題,在插件化的時(shí)候肯定要考慮到業(yè)務(wù)方的日志,我們把業(yè)務(wù)方的日志刷死了,當(dāng)網(wǎng)絡(luò)出現(xiàn)問題的時(shí)候每發(fā)送一條就失敗一條,那是不是都要打印出來?我們的考慮是第一條不打印,后面可能十條打印一次,一百條打印一次,一千條打印一次,這個(gè)量取決于業(yè)務(wù)。補(bǔ)充一點(diǎn),我們有一個(gè)統(tǒng)計(jì)線程,可以根據(jù)統(tǒng)計(jì)線程觀察Agent的正常與否。
?
?
五、流管理平臺(tái)
?
?
如圖所示,我們的流管理平臺(tái)界面比較簡單,但功能非常豐富,包括:
-
接入業(yè)務(wù)的管理、發(fā)布、上線;
-
對(duì)Agent節(jié)點(diǎn)進(jìn)行實(shí)時(shí)監(jiān)測(cè)、管理、命令;
-
對(duì)Flume進(jìn)行監(jiān)測(cè)、管理;
-
對(duì)實(shí)時(shí)計(jì)算的job的管理;
-
對(duì)全鏈路的數(shù)據(jù)流量對(duì)帳,這是我們自檢的功能;
-
智能監(jiān)控報(bào)警,我們有一個(gè)非常人性化的報(bào)警閥值的建議。取一個(gè)平均值,比如一周或一天,設(shè)定一個(gè)閥值,比如一天的流量訪問次數(shù)可能是一千次,我們?cè)O(shè)計(jì)的報(bào)警是2000次,當(dāng)連續(xù)一周都是2000次的時(shí)候就得改進(jìn)。
?
?
六、數(shù)據(jù)中轉(zhuǎn)
?
?
1、背景
?
業(yè)務(wù)發(fā)展可能從1到100再到1000,或者當(dāng)公司互聯(lián)網(wǎng)發(fā)展到一定程度的時(shí)候業(yè)務(wù)可能遍布世界各地,魅族的云服務(wù)數(shù)據(jù)分為海外服務(wù)和國內(nèi)服務(wù),我們把業(yè)務(wù)拆分開來,大數(shù)據(jù)采集肯定也要跟著走,這就面臨著數(shù)據(jù)中轉(zhuǎn)的問題。
?
?? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?
如圖所示是我們兩個(gè)案例的示意圖。黑色的是內(nèi)網(wǎng)的線,橙色的是跨界性的線,有公網(wǎng)的、云端的、專線的,各種各樣的網(wǎng)絡(luò)情況。
?
上面的是Agent集群,B-IDC也有一個(gè)Agent集群,直接訪問我們登錄的集群。
?
這里第一個(gè)問題是我們的連接非常多,訪問Agent節(jié)點(diǎn)的時(shí)候有幾千個(gè)Agent節(jié)點(diǎn)就得訪問幾千個(gè)節(jié)點(diǎn),這是不太友好的事情。另一個(gè)問題是當(dāng)我們做升級(jí)遷移的時(shí)候,Agent要做修改和配置,必須得重啟,當(dāng)整個(gè)B-IDC遷移到A-IDC,我們加了一個(gè)Flyme集群。同樣是一個(gè)Agent集群,下面有一個(gè)Flume集群,這樣的好處:一是里面的連接非常少,線上的Flume一個(gè)ID就三臺(tái);二是這邊承載了所有的Agent,除了Agent還有其他的采集都在A-IDC里中轉(zhuǎn),當(dāng)這個(gè)片區(qū)要做升級(jí)的時(shí)候上面的業(yè)務(wù)是透明的,靈活性非常高。
?
2、Flume介紹
?
Flume里有三個(gè)核心的部分:Source、Channel、Sink,Source是數(shù)據(jù)結(jié)構(gòu)源;Channel相當(dāng)于內(nèi)存大隊(duì)列,Sink是輸出到不同的目標(biāo)。官方提供了很多組件:Avro、HTTP、Thrift、Memory、File、Spillable Memory、Avro、Thrift、Hdfs、Hive。
?
3、Flume實(shí)踐
?
無Group,采用Zookeeper做集群
?
Agent采用LB做負(fù)載均衡,動(dòng)態(tài)感知。結(jié)合Zookeeper可以感知到Agent列表,這時(shí)會(huì)采用負(fù)載均衡的做法找到當(dāng)前的那個(gè)Flume,到后端的Flume直接變化的時(shí)候可以感知到從而下線。
?
硬盤緩存、無損啟停。采用memory可能會(huì)帶來些不好的問題,如果內(nèi)存隊(duì)列改成文件就沒有這個(gè)問題。因?yàn)閮?nèi)存速度快,存儲(chǔ)強(qiáng)制刷新的時(shí)候就沒有數(shù)據(jù)了,所以我們做了優(yōu)化:還是采用memory,在Flume停的時(shí)候把數(shù)據(jù)采集下來,下一次啟動(dòng)的時(shí)候把數(shù)據(jù)發(fā)出去,這時(shí)就可以做到無損啟停,但是有一點(diǎn)千萬要注意:磁盤其實(shí)是固化在機(jī)器里面,當(dāng)這臺(tái)機(jī)器停下不再啟動(dòng)的時(shí)候,別忘了把數(shù)據(jù)移走發(fā)出去。
?
停止順序優(yōu)化。在做優(yōu)化的時(shí)候遇到源碼的修改,其實(shí)就是Flume停止順序的優(yōu)化。原生里好像先停止Channel,然后提高sink,這就會(huì)導(dǎo)致想要做這個(gè)功能的時(shí)候做不到。我們應(yīng)該先把這個(gè)數(shù)據(jù)改掉再去停止sink最后停止Channel,這樣就保證Channel里的數(shù)據(jù)可以全部固化到硬盤里。
?
多種轉(zhuǎn)發(fā)方式。我們現(xiàn)在是全球的RBC,支持公網(wǎng)、內(nèi)網(wǎng)、跨域性專線,我們提供一個(gè)非常好的功能:http sink,它也是一個(gè)安全的支持ssl的轉(zhuǎn)換方式。
?
自定義Sink,多線程發(fā)送(channel的get只能單線程)。
?
4、停止順序
?
?
如圖是停止順序的修改。這是一個(gè)sourceRunner、sink、channel。
?
5、Memory的capacity
?
?
選擇內(nèi)存之后,這個(gè)內(nèi)存大小到底多少比較合適?如圖所示,左邊Flume是從500-1000,channel容量是5萬、10萬,還有Agent的個(gè)數(shù)、線程,我們發(fā)現(xiàn)在10萬的時(shí)候它的fullGC是非常頻繁的,所以我們最后定的大小是5萬。當(dāng)然不同的機(jī)器根據(jù)不同的測(cè)試得到自己的值,這個(gè)值不是恒定的。
?
包大小從10K到30K到50K有什么不一樣呢?很明顯TPS從1萬多降到了2000多,因?yàn)榘酱缶W(wǎng)卡就越慢了,這里看到其實(shí)已經(jīng)到了200兆(雙網(wǎng)卡),把網(wǎng)卡跑滿了。我們做流平臺(tái)設(shè)計(jì)的時(shí)候,不希望鏈路被跑滿,所以我們給了個(gè)建議值,大小在5-10K。當(dāng)然,線上我們采用的萬兆網(wǎng)卡。
?
?
七、實(shí)時(shí)計(jì)算
?
?
1、實(shí)時(shí)計(jì)算集群
?
在SparkZK里直接寫HA,可以減少不必要的MR提高IO,減少IO消耗。
Kafka+Strom (ZK)
?
2、Spark實(shí)踐
?
直接寫HDFS底層文件
?
自動(dòng)創(chuàng)建不存在的Hive分區(qū)
?
相應(yīng)Metaq的日志切割,這一點(diǎn)上現(xiàn)在的Kafka是沒有問題的,當(dāng)時(shí)的日志切割會(huì)導(dǎo)致網(wǎng)絡(luò)連接超時(shí),我們查看源代碼發(fā)現(xiàn)確實(shí)會(huì)堵塞,我們的解決方法是把切割調(diào)成多色或分區(qū)調(diào)多。
?
不要定時(shí)的killJob。早期的Spark版本因?yàn)榇笈康膋illJob導(dǎo)致一些不穩(wěn)定的情況,某些job其實(shí)是沒有被完全覆蓋,假死在那里的。
轉(zhuǎn)載于:https://www.cnblogs.com/davidwang456/p/6877128.html
總結(jié)
以上是生活随笔為你收集整理的魅族大数据之流平台设计部署实践--转的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: excel工具类
- 下一篇: Nashorn——在JDK 8中融合Ja