数据仓库之电商数仓-- 1、用户行为数据采集
目錄
- 一、數據倉庫概念
- 二、項目需求及架構設計
- 2.1 項目需求分析
- 2.2 項目框架
- 2.2.1 技術選型
- 2.2.2 系統數據流程設計
- 2.2.3 框架版本選型
- 2.2.4 服務器選型
- 2.2.5 集群規模
- 2.2.6 集群資源規劃設計
- 三、數據生成模塊
- 3.1 目標數據
- 3.1.1 頁面日志
- 3.1.2 事件日志
- 3.1.3 曝光日志
- 3.1.4 啟動日志
- 3.1.5 錯誤日志
- 3.2數據埋點
- 3.2.1 主流埋點方式
- 3.2.2 埋點數據上報時機
- 3.2.3 埋點數據日志結構
- 3.3 服務器和JDK準備
- 3.4 模擬數據
- 3.4.1 使用說明
- 3.4.2 集群日志生成腳本
- 四、數據采集模塊
- 4.1 集群所有進程查看腳本
- 4.2 zookeeper安裝
- 4.2.1 安裝ZK
- 4.2.2 ZK集群啟動停止腳本
- 4.3 Kafka安裝
- 4.3.1 Kafka集群安裝
- 4.3.2 Kafka集群啟動停止腳本
- 4.3.3 常用命令
- 4.3.4 項目經驗之Kafka機器數量計算
- 4.3.5 項目經驗之壓力測試
- 4.3.6 項目經驗之Kafka分區數計算
- 4.4采集日志Flume
- 4.4.1 日至采集Flume安裝
- 4.4.2 項目經驗之Flume組件選型
- 4.4.3 日志采集 Flume 配置
- 4.4.4 Flume攔截器
- 4.4.5 測試Flume-Kafka通道
- 4.4.6 日至采集Flume啟動停止腳本
- 4.5 消費Kafka數據Flume
- 4.5.1 項目經驗之Flume組件選型
- 4.5.1.1 FileChannel 和 MemoryChannel 區別
- 4.5.1.2 FileChannel優化
- 4.5.1.3 Sink:HDFS Sink
- 4.5.2 Flume時間戳攔截器
- 4.5.3 消費者Flume配置
- 4.5.4 消費者Flume啟動停止腳本
- 4.6 采集通道啟動停止腳本
- 五、常見問題及解決方案
- 5.1 2NN頁面不能顯示完整信息
-----------------------------------------------------分隔符-----------------------------------------------------
數據倉庫之電商數倉-- 1、用戶行為數據采集==>
數據倉庫之電商數倉-- 2、業務數據采集平臺==>
數據倉庫之電商數倉-- 3.1、電商數據倉庫系統(DIM層、ODS層、DWD層)==>
數據倉庫之電商數倉-- 3.2、電商數據倉庫系統(DWS層)==>
數據倉庫之電商數倉-- 3.3、電商數據倉庫系統(DWT層)==>
數據倉庫之電商數倉-- 3.4、電商數據倉庫系統(ADS層)==>
數據倉庫之電商數倉-- 4、可視化報表Superset==>
數據倉庫之電商數倉-- 5、即席查詢Kylin==>
一、數據倉庫概念
數據倉庫(Data Warehouse),是為企業制定決策,提供數據支持的??梢詭椭髽I改進業務流程、提高產品質量等。
數據倉庫的輸入數據通常包括:業務數據、用戶行為數據和爬蟲數據等。
業務數據:就是各行業在處理事務過程中產生的數據。如用戶在電商網站中登錄、下載、支付等過程中,需要和網站后臺數據庫進行增刪改查交互,產生的數據就是業務數據。業務數據通常存儲在MySQL、Oracle等數據庫中。
二、項目需求及架構設計
2.1 項目需求分析
項目需求
2.2 項目框架
2.2.1 技術選型
??:
技術選型主要考慮因素:數據量大小、業務需求、行業內經驗、技術成熟度、開發維護成本、總成本預算。
數據采集傳輸:Flume, Kafka, Sqoop, Logstash, DataX;
數據存儲:MySQL, HDFS, HBase, Redis, MongoDB;
數據計算:Hive, Tex, Spark, Flink, Storm;
數據查詢:presto, Kylin, Impala, Druid, Clickouse, Doris;
數據可視化:Echarts, Superset, QuickBI, DataV;
任務調度:Azkaban, Oozie, DolphinScheduler, Airflow;
集群監控:Zabbix, Prometheus;
元數據管理:Atalas;
權限管理:Ranger, Sentry.
2.2.2 系統數據流程設計
2.2.3 框架版本選型
1). Apache:運維麻煩,組件間兼容性需要自己調研,開源;
2). CDH:國內使用最多的版本,不開源;
3). HDP:開源,可進行二次開發,但沒有CDH穩定,國內使用甚少。
1). 阿里云EMR、MaxCompute、DataWorks
2). 亞馬遜云EMR
3). 騰訊云EMR
4). 華為云EMR
Apache框架版本:
tips:
框架選型最好選擇半年前的穩定版!
2.2.4 服務器選型
2.2.5 集群規模
1). 每天日活躍用戶100萬,每人一天平均100條:100萬*100條=1億條;
2). 每條日志1k左右,每天1億條:100000000 / 1024 /1024 = 約100G;
3). 半年內不擴容服務器:100G*180天=約18T;
4). 保存3個副本:18T*3=54T;
5). 預留20%~30%Buf = 54T/0.7 = 77T;
6). 約8T*10臺服務器。
2.2.6 集群資源規劃設計
在企業中通常會搭建一套生產集群和一套測試集群。生產集群運行生產任務,測試集群用于上線前代碼編寫和測試。
1). 消耗內存的需分開;
2). 數據傳輸數據比較緊密的放在一起(Kafka、Zookeeper);
3). 客戶端盡量放在一到兩臺服務器上,方便外部訪問;
4). 有依賴關系的盡量放在同一臺服務器上(如Hive和Azkaban Executor)。
三、數據生成模塊
3.1 目標數據
我們要收集和分析數據主要包括頁面數據、時間數據、曝光數據、啟動數據和錯誤數據。
3.1.1 頁面日志
頁面數據主要記錄一個頁面的用戶訪問情況,包括訪問時間、停留時間、頁面路徑等信息。
3.1.2 事件日志
時間數據主要記錄應用內一個具體操作行為,包括操作類型
操作對象、操作對象描述等信息。
3.1.3 曝光日志
曝光數據主要記錄頁面所曝光的內容,包括曝光對象,曝光類型等信息。
3.1.4 啟動日志
啟動數據記錄應用的啟動信息。
3.1.5 錯誤日志
錯誤數據記錄應用使用過程中的錯誤信息,包括錯誤編號及錯誤信息。
3.2數據埋點
3.2.1 主流埋點方式
目前主流的埋點方式,有代碼埋點(前端/后端)、可視化埋點、全埋點三種。
代碼埋點是通過調用埋點 SDK 函數,在需要埋點的業務邏輯功能位置調用接口,上報埋點數據。例如,我們對頁面中的某個按鈕埋點后,當這個按鈕被點擊時,可以在這個按鈕 對應的 OnClick 函數里面調用 SDK 提供的數據發送接口,來發送數據。
可視化埋點只需要研發人員集成采集 SDK,不需要寫埋點代碼,業務人員就可以通過 訪問分析平臺的“圈選”功能,來“圈”出需要對用戶行為進行捕捉的控件,并對該事件進行命名。圈選完畢后,這些配置會同步到各個用戶的終端上,由采集 SDK 按照圈選的配置 自動進行用戶行為數據的采集和發送。
全埋點是通過在產品中嵌入 SDK,前端自動采集頁面上的全部用戶行為事件,上報埋點數據,相當于做了一個統一的埋點。然后再通過界面配置哪些數據需要在系統里面進行分析。
3.2.2 埋點數據上報時機
埋點數據上報時機包括兩種方式:
方式一,在離開該頁面時,上傳在這個頁面產生的所有數據(頁面、事件、曝光、錯誤 等)。
優點:批處理,減少了服務器接收數據壓力,網絡IO少;
缺點:實效性差。
方式二,每個事件、動作、錯誤等產生后,立即發送。
優點:響應及時;
缺點:對服務器接收數據壓力比較大,網絡IO增加。
本項目采用方式一埋點。
3.2.3 埋點數據日志結構
我們的日志結構大致可分為兩類,一是普通頁面埋點日志,二是啟動日志。
普通頁面日志結構如下,每條日志包含了,當前頁面的頁面信息,所有事件(動作)、 所有曝光信息以及錯誤信息。除此之外,還包含了一系列公共信息,包括設備信息,地理位 置,應用信息等,即下邊的 common 字段。
3.3 服務器和JDK準備
分別安裝 hadoop102、hadoop103、hadoop104 三臺主機。
服務器和JDK的準備內容看這里==>
3.4 模擬數據
3.4.1 使用說明
1). 將 application.yml、gmall2020-mock-log-2021-01-22.jar、path.json、logback.xml 上傳到 hadoop102 的/opt/module/applog 目錄下??
2). 上傳文件
1). 配置application.yml文件:
# 外部配置打開 logging.config: "./logback.xml" #業務日期 mock.date: "2020-06-14"#模擬數據發送模式 #mock.type: "http" #mock.type: "kafka" mock.type: "log"#http模式下,發送的地址 mock.url: "http://localhost:8080/applog"#kafka模式下,發送的地址 mock:kafka-server: "hadoop102:9092,hadoop103:9092,hadoop104:9092"kafka-topic: "ODS_BASE_LOG"#啟動次數 mock.startup.count: 200 #設備最大值 mock.max.mid: 500000 #會員最大值 mock.max.uid: 100 #商品最大值 mock.max.sku-id: 35 #頁面平均訪問時間 mock.page.during-time-ms: 20000 #錯誤概率 百分比 mock.error.rate: 3 #每條日志發送延遲 ms mock.log.sleep: 10 #商品詳情來源 用戶查詢,商品推廣,智能推薦, 促銷活動 mock.detail.source-type-rate: "40:25:15:20" #領取購物券概率 mock.if_get_coupon_rate: 75 #購物券最大id mock.max.coupon-id: 3 #搜索關鍵詞 mock.search.keyword: "圖書,小米,iphone11,電視,口紅,ps5,蘋果手機,小米盒子"2). 配置path.json文件:
[{"path":["home","good_list","good_detail","cart","trade","payment"],"rate":20 },{"path":["home","search","good_list","good_detail","login","good_detail","cart","trade","payment"],"rate":40 },{"path":["home","mine","orders_unpaid","trade","payment"],"rate":10 },{"path":["home","mine","orders_unpaid","good_detail","good_spec","comment","trade","payment"],"rate":5 },{"path":["home","mine","orders_unpaid","good_detail","good_spec","comment","home"],"rate":5 },{"path":["home","good_detail"],"rate":10 },{"path":["home" ],"rate":10 }3). 配置logback文件:
<?xml version="1.0" encoding="UTF-8"?> <configuration><property name="LOG_HOME" value="/opt/module/applog/log" /><appender name="console" class="ch.qos.logback.core.ConsoleAppender"><encoder><pattern>%msg%n</pattern></encoder></appender><appender name="rollingFile" class="ch.qos.logback.core.rolling.RollingFileAppender"><rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"><fileNamePattern>${LOG_HOME}/app.%d{yyyy-MM-dd}.log</fileNamePattern></rollingPolicy><encoder><pattern>%msg%n</pattern></encoder></appender><!-- 將某一個包下日志單獨打印日志 --><logger name="com.xiaobai.gmall2020.mock.log.util.LogUtil"level="INFO" additivity="false"><appender-ref ref="rollingFile" /><appender-ref ref="console" /></logger><root level="error" ><appender-ref ref="console" /></root> </configuration> ~1). 如圖,在/opt/module/applog目錄下執行以下命令生成對應的日志文件log:
java -jar gmall2020-mock-log-2021-01-22.jar2). 在/opt/module/applog/log目錄下查看生成日志:
3.4.2 集群日志生成腳本
注:
1). /opt/module/applog/為 jar 包及配置文件所在路徑
2). /dev/null 代表 linux 的空設備文件,所有往這個文件里面寫入的內容都會丟失,俗
稱“黑洞”。
標準輸入 0:從鍵盤獲得輸入 /proc/self/fd/0;
標準輸出 1:輸出到屏幕(即控制臺) /proc/self/fd/1;
錯誤輸出 2:輸出到屏幕(即控制臺) /proc/self/fd/2。
如圖,hadoop102 / hadoop103加載出了log:
四、數據采集模塊
4.1 集群所有進程查看腳本
4.2 zookeeper安裝
4.2.1 安裝ZK
集群規劃
在hadoop102、hadoop103、hadoop104三個節點上部署zookeeper。
解壓安裝
1). 解壓zookeeper安裝包到/opt/module/目錄下:
[xiaobai@hadoop102 software]$ tar -zxvf apache-zookeeper-3.5.7-bin.tar.gz -C /opt/module/2). 修改/opt/module/apache-zookeeper-3.5.7-bin名稱為zookeeper-3.5.7:
[xiaobai@hadoop102 module]$ mv apache-zookeeper-3.5.7-bin/ zookeeper-3.5.71). 在/opt/module/zookeeper-3.5.7目錄下創建zkData:
[xiaobai@hadoop102 zookeeper-3.5.7]$ mkdir zkData2). 在/opt/module/zookeeper-3.5.7/zkData目錄下創建一個myid文件:
[xiaobai@hadoop102 zookeeper-3.5.7]$ vim myid添加myid文件,??一定要在Linux里面創建!
在文件中添加域server對應的編號:
3). 同步/opt/module/zookeeper-3.5.7目錄到hadoop103、hadop104:
[xiaobai@hadoop102 module]$ xsync zookeeper-3.5.7/分別在hadoop103、hadoop104上修改myid文件中內容為3、4:
[xiaobai@hadoop103 zkData]$ vim myid 3 [xiaobai@hadoop104 zkData]$ vim myid 41). 重命名/opt/module/zookeeper-3.5.7/conf目錄下的zoo_sample.cfg為zoo.cfg :
[xiaobai@hadoop102 conf]$ mv zoo_sample.cfg zoo.cfg2). 打開zoo.cfg 文件:
[xiaobai@hadoop102 conf]$ vim zoo.cfg如圖,修改數據存儲路徑配置:
dataDir=/opt/module/zookeeper-3.5.7/zkData如圖,增加如下配置:
#######################cluster################################### server.2=hadoop102:2888:3888 server.3=hadoop103:2888:3888 server.4=hadoop104:2888:38883). 同步分發zoo.cfg 配置文件:
[xiaobai@hadoop102 conf]$ xsync zoo.cfg4). 配置參數解讀:
server.A=B:C:DA 是一個數字,表示這是第幾號服務器;
集群模式下配置一個文件 myid,這個文件在 dataDir 目錄下,這個文件里面有一個數據就是 A 的值,Zookeeper 啟動時讀取此文件,拿到里面的數據與 zoo.cfg 里面的配置信息比較從而判斷到底是哪個 server;
B 是這個服務器的地址;
C 是這個服務器 Follower 與集群中的 Leader 服務器交換信息的端口;
D 是萬一集群中的 Leader 服務器掛了,需要一個端口來重新進行選舉,選出一個新的Leader,而這個端口就是用來執行選舉時服務器相互通信的端口。
1). 分別啟動zookeeper:
[xiaobai@hadoop102 zookeeper-3.5.7]$ bin/zkServer.sh start [xiaobai@hadoop103 zookeeper-3.5.7]$ bin/zkServer.sh start [xiaobai@hadoop104 zookeeper-3.5.7]$ bin/zkServer.sh start2). 查看狀態:
[xiaobai@hadoop102 zookeeper-3.5.7]$ bin/zkServer.sh status ZooKeeper JMX enabled by default Using config: /opt/module/zookeeper-3.5.7/bin/../conf/zoo.cfg Client port found: 2181. Client address: localhost. Mode: follower[xiaobai@hadoop103 zookeeper-3.5.7]$ bin/zkServer.sh status ZooKeeper JMX enabled by default Using config: /opt/module/zookeeper-3.5.7/bin/../conf/zoo.cfg Client port found: 2181. Client address: localhost. Mode: follower[xiaobai@hadoop104 zookeeper-3.5.7]$ bin/zkServer.sh status ZooKeeper JMX enabled by default Using config: /opt/module/zookeeper-3.5.7/bin/../conf/zoo.cfg Client port found: 2181. Client address: localhost. Mode: leader4.2.2 ZK集群啟動停止腳本
在腳本中編寫如下內容:
#!/bin/bashcase $1 in "start")for i in hadoop102 hadoop103 hadoop104doecho "---------- $i ------------"ssh $i "/opt/module/zookeeper-3.5.7/bin/zkServer.sh start"done ;; "stop")for i in hadoop102 hadoop103 hadoop104doecho "---------- $i ------------"ssh $i "/opt/module/zookeeper-3.5.7/bin/zkServer.sh stop"done ;; "status")for i in hadoop102 hadoop103 hadoop104doecho "---------- $i ------------"ssh $i "/opt/module/zookeeper-3.5.7/bin/zkServer.sh status"done ;; esac4.3 Kafka安裝
4.3.1 Kafka集群安裝
依次在 hadoop102、hadoop103、hadoop104 節點上啟動 kafka:
在/opt/module/kafka目錄下執行以下命令:
4.3.2 Kafka集群啟動停止腳本
4.3.3 常用命令
/opt/module/kafka/目錄下創建日志主題:
注??:–from-beginning:會把主題中以往所有的數據都讀取出來。根據業務場景選擇是否增加該配置。
4.3.4 項目經驗之Kafka機器數量計算
Kafka機器數量(經驗公式)= 2 *(峰值生產速度 * 副本數 / 100)+ 1;
先拿到峰值生產速度,再根據設定的副本數,就能預估出需要部署Kafka的數量。
峰值生產速度:可以通過壓測得到;
副本數:
默認副本數是1個,在企業里會有2-3個,通常為2;
副本多可以提高可靠性,但是會降低網絡傳輸速率;
如峰值生產速度為50M/s,副本數為2;
Kafka機器數量 = 2 * (50 * 2 / 100)+ 1 = 3臺。
4.3.5 項目經驗之壓力測試
Kafka壓測:
用Kafka官方自帶的腳本,對Kafka進行壓測:
kafka-consumer-perf-test.sh
kafka-producer-perf-test.sh
Kafka壓測時,在硬盤讀寫速度一定的情況下,可以查看到哪些地方出現了瓶頸(CPU,內存,網絡 IO);一般都是網絡 IO 達到瓶頸。
4.3.6 項目經驗之Kafka分區數計算
consumer 吞吐量=50m/s,期望吞吐量 100m/s;
分區數=100 / 20 =5 分區;
建議參考==》
4.4采集日志Flume
4.4.1 日至采集Flume安裝
注??:刪除guava-11.0.2.jar 的服務器節點,一定要配置hadoop環境變量,否則會報異常。
4.4.2 項目經驗之Flume組件選型
1). flume1.6 source:
exec :
優點:可以實時監控文件變化;
缺點:有丟數據的風險。
spooling:
優點:可以實現斷點續傳;
缺點:不能實時監控文件變化。
2). flume 1.7 taildir source??
Taildir Source 相比 Exec Source、Spooling Directory Source 的優勢:
a. 可實現斷點續傳,多目錄;Flume1.6 以前需要自己自定義 Source 記錄每次讀取文件位置,實現斷點續傳。
b. 可以實時監控文件的變化。
Exec Source 可以實時搜集數據,但是在 Flume 不運行或者 Shell 命令出錯的情況下,數據將會丟失;Spooling Directory Source 監控目錄,支持斷點續傳,但不能實時監控文件變化。
3). batchSize 大小如何設置?
答:Event 1K 左右時,500-1000 合適(默認為 100)
file channel??: 數據存儲在磁盤中,可靠性高,效率低;
memory channel??: 數據存儲在內存中,效率高,可靠性差。
采用 Kafka Channel,省去了 Sink,提高了效率。KafkaChannel 數據存儲在 Kafka 里面, 所以數據是存儲在磁盤中。
4.4.3 日志采集 Flume 配置
Flume配置分析:
注??:Flume 直接讀 log 日志的數據,log 日志的格式是 app.yyyy-mm-dd.log。
Flume的具體配置如下:
1). 在/opt/module/flume/conf 目錄下創建 file-flume-kafka.conf 文件:
[xiaobai@hadoop102 conf]$ vim file-flume-kafka.conf2). 在文件配置如下內容:
#定義組件 a1.sources=r1 a1.channels=c1#配置source (Taildirsource) a1.sources.r1.type=TAILDIR a1.sources.r1.filegroups=f1 a1.sources.r1.filegroups.f1=/opt/module/applog/log/app.* a1.sources.r1.positionFile=/opt/module/flume/taildir_position.json#配置攔截器(ETL數據清洗 判斷json是否完整) a1.sources.r1.interceptors=i1 a1.sources.r1.interceptors.i1.type=com.xiaobai.flume.interceptor.ETLInterceptor$Builder#配置channel a1.channels.c1.type=org.apache.flume.channel.kafka.kafkaChannel a1.channels.c1.kafka.bootstrap.servers=hadoop102:9092,hadoop103:9092 a1.channels.c1.kafka.topic=topic_log a1.channels.c1.parseAsFlumeEvent=false#配置sink(Kafka Channel 無Sink)#拼接組件 a1.sources.r1.channels=c13). 分發file-flume-kafka.conf:
[xiaobai@hadoop102 conf]$ xsync file-flume-kafka.conftips: com.xiaobai.flume.interceptor.ETLInterceptor是自定義的攔截器全類名。
4.4.4 Flume攔截器
打包:
將打包好的帶依賴的jar包傳入hadoop102 的/opt/module/flume/lib 目錄下:
過濾查找是否存在帶依賴的jar包flume-interceptor-1.0-SNAPSHOT-jar-with-dependencies.jar:
4.4.5 測試Flume-Kafka通道
如圖所示就是已經獲取到數據了!我這里用兩個遠程工具同時開了好幾個遠程端口進行不同操作,所以這個端口看起來會不一樣。
注??:
這里獲取不到數據的話,后面會很麻煩!!!!
前車之鑒,一定要認真檢查!配置文件!少個字母多個字母 包名不正確之類的都會引起錯誤!!!若是配置文件都沒錯的話 就檢查Kafka、Flume、Zookeeper是否正確啟動,仔細檢查狀態!!!再檢查Flume的攔截器代碼是否正常!!所引用的包名是否正確!!!!!!!!試圖引起注意!!!!!!!!n多個感嘆號?足以證明會有多重要 我是當時不以為然 沒在意 結果到后面進行不下去了 又返回來重新檢查了一遍的😭!
4.4.6 日至采集Flume啟動停止腳本
??注:
4.5 消費Kafka數據Flume
4.5.1 項目經驗之Flume組件選型
4.5.1.1 FileChannel 和 MemoryChannel 區別
MemoryChannel 傳輸數據速度更快,但因為數據保存在 JVM 的堆內存中,Agent 進程掛掉會導致數據丟失,適用于對數據質量要求不高的需求。
FileChannel 傳輸速度相對于 Memory 慢,但數據安全保障高,Agent 進程掛掉也可以從失敗中恢復數據。
選型:
金融類公司、對錢要求非常準確的公司通常會選擇 FileChannel;
傳輸的是普通日志信息(京東內部一天丟 100 萬-200 萬條,這是非常正常的),通常選擇 MemoryChannel。
4.5.1.2 FileChannel優化
通過配置 dataDirs 指向多個路徑,每個路徑對應不同的硬盤,增大 Flume 吞吐量。 官方說明如下:
Comma separated list of directories for storing log files. Using multiple directories on separate disks can improve file channel peformance
checkpointDir 和 backupCheckpointDir 也盡量配置在不同硬盤對應的目錄中,保證checkpoint 壞掉后,可以快速使用 backupCheckpointDir 恢復數據。
FileChannel底層原理
4.5.1.3 Sink:HDFS Sink
元數據層面: 每個小文件都有一份元數據,其中包括文件路徑,文件名,所有者,所屬組,權限,創建時間等,這些信息都保存在 Namenode 內存中。所以小文件過多,會占用 Namenode 服務器大量內存,影響 Namenode 性能和使用壽命;
計算層面:默認情況下 MR 會對每個小文件啟用一個 Map 任務計算,非常影響計算性能。同時也影響磁盤尋址時間。
官方默認的這三個參數配置寫入 HDFS 后會產生小文件,hdfs.rollInterval、hdfs.rollSize、 hdfs.rollCount
基于以上 hdfs.rollInterval=3600,hdfs.rollSize=134217728,hdfs.rollCount =0 幾個參數綜合作用,效果如下:
1). 文件在達到 128M 時會滾動生成新文件;
2). 文件創建超 3600 秒時會滾動生成新文件.
4.5.2 Flume時間戳攔截器
由于 flume 默認會用 linux 系統時間,作為輸出到 HDFS 路徑的時間。如果數據是 23:59 分產生的。Flume 消費 kafka 里面的數據時,有可能已經是第二天00:00以后了,那么該部門數據會被發往第二天的 HDFS 路徑。我們希望的是根據日志里面的實際時間,發往 HDFS 的路徑,所以下面攔截器作用是獲取日志中的實際時間。
4.5.3 消費者Flume配置
1). 在 hadoop104 的/opt/module/flume/conf 目錄下創建 kafka-flume-hdfs.conf 文件:
[xiaobai@hadoop104 conf]$ vim kafka-flume-hdfs.conf2). 在文件配置如下內容:
## 定義組件 ## 定義組件 a1.sources=r1 a1.channels=c1 a1.sinks=k1## 配置source a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource #a1.sources.r1.batchSize = 5000 a1.sources.r1.batchDurationMillis = 2000 a1.sources.r1.kafka.bootstrap.servers=hadoop102:9092,hadoop103:9092,hadoop104:9092 a1.sources.r1.kafka.topics=topic_log a1.sources.r1.batchSize=5000 a1.sources.r1.batchDurationMills=2000#時間戳攔截器 a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type = com.xiaobai.flume.interceptor.TimeStampInterceptor$Builder## 配置channel a1.channels.c1.type = file a1.channels.c1.checkpointDir=/opt/module/flume/checkpoint/behavior1 a1.channels.c1.dataDirs=/opt/module/flume/data/behavior1/ #a1.channels.c1.maxFileSize = 2146435071 #a1.channels.c1.capacity = 1000000 a1.channels.c1.keep-alive = 6## 配置sink a1.sinks.k1.type = hdfs a1.sinks.k1.hdfs.path =/origin_data/gmall/log/topic_log/%Y-%m-%da1.sinks.k1.hdfs.rollInterval=10 a1.sinks.k1.hdfs.rollSize=134217728 a1.sinks.k1.hdfs.rollCount=0a1.sinks.k1.hdfs.filePrefix = log- a1.sinks.k1.hdfs.round = false #a1.sinks.k1.hdfs.fileType=DataStream ##a1.sinks.k1..hdfs.writeFormat=Text # # #a1.sinks.k1.hdfs.rollInterval = 10 #a1.sinks.k1.hdfs.rollSize = 134217728 #a1.sinks.k1.hdfs.rollCount = 0## 控制輸出文件是原生文件 a1.sinks.k1.hdfs.fileType = CompressedStream a1.sinks.k1.hdfs.codeC = lzop## 拼接組件 a1.sources.r1.channels = c1 a1.sinks.k1.channel= c14.5.4 消費者Flume啟動停止腳本
在腳本中填寫如下內容:
#!/bin/bashcase $1 in "start"){for i in hadoop104doecho "------啟動$i消費flume------"ssh $i "nohup /opt/module/flume/bin/flume-ng agent --conf-file /opt/module/flume/conf/kafka-flume-hdfs.conf --name a1 -Dflume.root.logger=INFO,LOGFILE >/opt/module/flume/log2.txt 2>&1 &"done };; "stop"){for i in hadoop104doecho "------停止$i消費flume------"ssh $i "ssh $i ps -ef | grep kafka-flume-hdfs | grep -v grep | awk '{print \$2}' | xargs -n1 kill"done };; esac4.6 采集通道啟動停止腳本
五、常見問題及解決方案
5.1 2NN頁面不能顯示完整信息
1. 問題描述:
如圖,訪問2NN頁面http://hadoop104:9868,看不到詳細信息。
2. 解決方法:
輸入以下代碼顯示行號:
:set nu如圖,shift+d:修改替換61行:
return new Date(Number(v)).toLocalString();3. 問題描述:
hadoop104拒絕連接請求。
4. 解決方法:
如果網絡暢通且關閉了防火墻,那可能是由于集群忘記開啟,重新啟動集群再次刷新hadoop104:9868界面即可。
總結
以上是生活随笔為你收集整理的数据仓库之电商数仓-- 1、用户行为数据采集的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: [算法] 2-4 组合游戏
- 下一篇: ipad流水布局及其旋转界面view间隔