Flume案例实操
案例實操
3.1?Flume實時讀取目錄中文件到HDFS案例
1)案例需求:使用flume監(jiān)聽整個目錄的文件
2)需求分析:
?
3)實現步驟:
1.創(chuàng)建配置文件flume-dir-hdfs.conf
創(chuàng)建一個文件
[root@linux02 job]$ touch flume-dir-hdfs.conf
打開文件
[root@linux02 job]$ vim flume-dir-hdfs.conf
添加如下內容
a3.sources = r3
a3.sinks = k3
a3.channels = c3
?
# Describe/configure the source
a3.sources.r3.type = spooldir
a3.sources.r3.spoolDir = /opt/module/flume/upload
a3.sources.r3.fileSuffix = .COMPLETED
a3.sources.r3.fileHeader = true
#忽略所有以.tmp結尾的文件,不上傳
a3.sources.r3.ignorePattern = ([^ ]*\.tmp)
?
# Describe the sink
a3.sinks.k3.type = hdfs
a3.sinks.k3.hdfs.path = hdfs://linux01:9000/flume/upload/%Y%m%d/%H
#上傳文件的前綴
a3.sinks.k3.hdfs.filePrefix = upload-
#是否按照時間滾動文件夾
a3.sinks.k3.hdfs.round = true
#多少時間單位創(chuàng)建一個新的文件夾
a3.sinks.k3.hdfs.roundValue = 1
#重新定義時間單位
a3.sinks.k3.hdfs.roundUnit = hour
#是否使用本地時間戳
a3.sinks.k3.hdfs.useLocalTimeStamp = true
?
#積攢多少個Event才flush到HDFS一次
a3.sinks.k3.hdfs.batchSize = 100
#設置文件類型,可支持壓縮
a3.sinks.k3.hdfs.fileType = DataStream
#多久生成一個新的文件
a3.sinks.k3.hdfs.rollInterval = 30
#設置每個文件的滾動大小大概是128M
a3.sinks.k3.hdfs.rollSize = 134217700
#文件的滾動與Event數量無關
a3.sinks.k3.hdfs.rollCount = 0
?
# Use a channel which buffers events in memory
a3.channels.c3.type = memory
a3.channels.c3.capacity = 1000
a3.channels.c3.transactionCapacity = 100
?
# Bind the source and sink to the channel
a3.sources.r3.channels = c3
a3.sinks.k3.channel = c3
?
2. 啟動監(jiān)控文件夾命令
[root@linux02 flume]$ bin/flume-ng agent --conf conf/ --name a3 --conf-file job/flume-dir-hdfs.conf
說明:?在使用Spooling Directory Source時
3. 向upload文件夾中添加文件
在/opt/module/flume目錄下創(chuàng)建upload文件夾
[root@linux02 flume]$ mkdir upload
向upload文件夾中添加文件
[root@linux02 upload]$ touch hadoop.txt
[root@linux02 upload]$ touch hadoop.tmp
[root@linux02 upload]$ touch hadoop.log
4. 查看HDFS上的數據
5. 等待1s,再次查詢upload文件夾
[root@linux02 upload]$ ll
總用量 0
-rw-rw-r--. 1 hadoop hadoop 0 5月 ?20 22:31 bigdata.log.COMPLETED
-rw-rw-r--. 1 hadoop hadoop 0 5月 ?20 22:31 bigdata.tmp
-rw-rw-r--. 1 hadoop hadoop 0 5月 ?20 22:31 bigdata.txt.COMPLETED
3.2 [重點]Flume實時讀取本地文件新增內容到HDFS案例
1)案例需求:實時監(jiān)控Hive日志,并上傳到HDFS中
2)需求分析:
?
3)實現步驟:
創(chuàng)建flume-file-hdfs.conf文件
創(chuàng)建文件
[root@linux02 job]$ touch flume-file-hdfs.conf
注:要想讀取Linux系統中的文件,就得按照Linux命令的規(guī)則執(zhí)行命令。由于hive日志在Linux系統中所以讀取文件的類型選擇:exec即execute執(zhí)行的意思。表示執(zhí)行Linux命令來讀取文件。
[root@linux02 job]$ vim flume-file-hdfs.conf
添加如下內容
# Name the components on this agent
a2.sources = r2
a2.sinks = k2
a2.channels = c2
?
# Describe/configure the source
a2.sources.r2.type = exec
a2.sources.r2.command = tail -F /opt/module/hive/logs/hive.log
a2.sources.r2.shell = /bin/bash -c
?
# Describe the sink
a2.sinks.k2.type = hdfs
a2.sinks.k2.hdfs.path = hdfs://linux01:9000/flume/%Y%m%d/%H
#上傳文件的前綴
a2.sinks.k2.hdfs.filePrefix = logs-
#是否按照時間滾動文件夾
a2.sinks.k2.hdfs.round = true
#多少時間單位創(chuàng)建一個新的文件夾
a2.sinks.k2.hdfs.roundValue = 1
#重新定義時間單位
a2.sinks.k2.hdfs.roundUnit = hour
#是否使用本地時間戳
a2.sinks.k2.hdfs.useLocalTimeStamp = true
#積攢多少個Event才flush到HDFS一次
a2.sinks.k2.hdfs.batchSize = 1000
#設置文件類型,可支持壓縮
a2.sinks.k2.hdfs.fileType = DataStream
#多久生成一個新的文件
a2.sinks.k2.hdfs.rollInterval = 600
#設置每個文件的滾動大小
a2.sinks.k2.hdfs.rollSize = 134217700
#文件的滾動與Event數量無關
a2.sinks.k2.hdfs.rollCount = 0
?
# Use a channel which buffers events in memory
a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100
?
# Bind the source and sink to the channel
a2.sources.r2.channels = c2
a2.sinks.k2.channel = c2
?
執(zhí)行監(jiān)控配置
[root@linux02 flume]$ bin/flume-ng agent --conf conf/ --name a2 --conf-file job/flume-file-hdfs.conf
開啟hadoop和hive并操作hive產生日志
[root@linux02 hadoop-2.7.2]$ sbin/start-dfs.sh
[root@linux02 hadoop-2.7.2]$ sbin/start-yarn.sh
?
[root@linux02 hive]$ bin/hive
hive (default)>
在HDFS上查看文件
3.3?單數據源多出口案例
單Source多Channel、Sink如圖所示
圖?單Source多Channel、Sink
2)需求分析:
3)實現步驟:
0.準備工作
在job目錄下創(chuàng)建group1文件夾
[root@linux02 job]$ cd group1/
在/opt/module/datas/目錄下創(chuàng)建flume3文件夾
[root@linux02 datas]$ mkdir flume3
1.創(chuàng)建flume-file-flume.conf
配置1個接收日志文件的source和兩個channel、兩個sink,分別輸送給flume-flume-hdfs和flume-flume-dir。
創(chuàng)建配置文件并打開
[root@linux02 group1]$ touch flume-file-flume.conf
[root@linux02 group1]$ vim flume-file-flume.conf
添加如下內容
# Name the components on this agent
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2
# 將數據流復制給多個channel
a1.sources.r1.selector.type = replicating
?
# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/module/hive/logs/hive.log
a1.sources.r1.shell = /bin/bash -c
?
# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = linux02
a1.sinks.k1.port = 4141
?
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = linux02
a1.sinks.k2.port = 4142
?
# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
?
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100
?
# Bind the source and sink to the channel
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2
注:Avro是由Hadoop創(chuàng)始人Doug Cutting創(chuàng)建的一種語言無關的數據序列化和RPC框架。
注:RPC(Remote Procedure Call)—遠程過程調用,它是一種通過網絡從遠程計算機程序上請求服務,而不需要了解底層網絡技術的協議。
2.創(chuàng)建flume-flume-hdfs.conf
配置上級flume輸出的source,輸出是到hdfs的sink。
創(chuàng)建配置文件并打開
[root@linux02 group1]$ touch flume-flume-hdfs.conf
[root@linux02 group1]$ vim flume-flume-hdfs.conf
添加如下內容
# Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1
?
# Describe/configure the source
a2.sources.r1.type = avro
a2.sources.r1.bind = linux02
a2.sources.r1.port = 4141
?
# Describe the sink
a2.sinks.k1.type = hdfs
a2.sinks.k1.hdfs.path = hdfs://linux01:9000/flume2/%Y%m%d/%H
#上傳文件的前綴
a2.sinks.k1.hdfs.filePrefix = flume2-
#是否按照時間滾動文件夾
a2.sinks.k1.hdfs.round = true
#多少時間單位創(chuàng)建一個新的文件夾
a2.sinks.k1.hdfs.roundValue = 1
#重新定義時間單位
a2.sinks.k1.hdfs.roundUnit = hour
#是否使用本地時間戳
a2.sinks.k1.hdfs.useLocalTimeStamp = true
#積攢多少個Event才flush到HDFS一次
a2.sinks.k1.hdfs.batchSize = 100
#設置文件類型,可支持壓縮
a2.sinks.k1.hdfs.fileType = DataStream
#多久生成一個新的文件
a2.sinks.k1.hdfs.rollInterval = 600
#設置每個文件的滾動大小大概是128M
a2.sinks.k1.hdfs.rollSize = 134217700
#文件的滾動與Event數量無關
a2.sinks.k1.hdfs.rollCount = 0
#最小冗余數
a2.sinks.k1.hdfs.minBlockReplicas = 1
?
# Describe the channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100
?
# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1
3.創(chuàng)建flume-flume-dir.conf
配置上級flume輸出的source,輸出是到本地目錄的sink。
創(chuàng)建配置文件并打開
[root@linux02 group1]$ touch flume-flume-dir.conf
[root@linux02 group1]$ vim flume-flume-dir.conf
添加如下內容
# Name the components on this agent
a3.sources = r1
a3.sinks = k1
a3.channels = c2
?
# Describe/configure the source
a3.sources.r1.type = avro
a3.sources.r1.bind = linux02
a3.sources.r1.port = 4142
?
# Describe the sink
a3.sinks.k1.type = file_roll
a3.sinks.k1.sink.directory = /opt/module/datas/flume3
?
# Describe the channel
a3.channels.c2.type = memory
a3.channels.c2.capacity = 1000
a3.channels.c2.transactionCapacity = 100
?
# Bind the source and sink to the channel
a3.sources.r1.channels = c2
a3.sinks.k1.channel = c2
提示:輸出的本地目錄必須是已經存在的目錄,如果該目錄不存在,并不會創(chuàng)建新的目錄。
4.執(zhí)行配置文件
分別開啟對應配置文件:flume-flume-dir,flume-flume-hdfs,flume-file-flume。
[root@linux02 flume]$ bin/flume-ng agent --conf conf/ --name a3 --conf-file job/group1/flume-flume-dir.conf
[root@linux02 flume]$ bin/flume-ng agent --conf conf/ --name a2 --conf-file job/group1/flume-flume-hdfs.conf
[root@linux02 flume]$ bin/flume-ng agent --conf conf/ --name a1 --conf-file job/group1/flume-file-flume.conf
5.啟動hadoop和hive
[root@linux02 hadoop-2.7.2]$ sbin/start-dfs.sh
[root@linux02 hadoop-2.7.2]$ sbin/start-yarn.sh
?
[root@linux02 hive]$ bin/hive
hive (default)>
?
6.檢查HDFS上數據
?
7檢查/opt/module/datas/flume3目錄中數據
[root@linux02 flume3]$ ll
總用量 8
-rw-rw-r--. 1 root?root?5942 5月 ?22 00:09 1526918887550-3
3.4?多數據源匯總案例
多Source匯總數據到單Flume如圖所示
圖?多Flume匯總數據到單Flume
linux01上的flume-1監(jiān)控文件hive.log,
linux01上的flume-2監(jiān)控某一個端口的數據流,
flume-1與flume-2將數據發(fā)送給linux01上的flume-3,flume-3將最終數據打印到控制臺
?
3)實現步驟:
0.準備工作
分發(fā)flume
[root@linux02 module]$ scp?flume
在linux02、linux03以及hadoop104的/opt/module/flume/job目錄下創(chuàng)建一個group2文件夾
[root@linux02 job]$ mkdir group2
[root@linux02 job]$ mkdir group2
[root@linux03 job]$ mkdir group2
1.創(chuàng)建flume1.conf
配置source用于監(jiān)控hive.log文件,配置sink輸出數據到下一級flume。
在linux02上創(chuàng)建配置文件并打開
[root@linux02 group2]$ touch flume-file.conf
[root@linux02 group2]$ vim flume-file.conf
添加如下內容
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
?
# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/module/hive/logs/hive.log
a1.sources.r1.shell = /bin/bash -c
?
# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = linux02
a1.sinks.k1.port = 4141
?
# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
?
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
2.創(chuàng)建flume2.conf
配置source監(jiān)控端口44444數據流,配置sink數據到下一級flume:
在hadoop104上創(chuàng)建配置文件并打開
[root@linux03 group2]$ touch flume2.conf
[root@linux03 group2]$ vim flume2.conf
添加如下內容
# Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1
?
# Describe/configure the source
a2.sources.r1.type = netcat
a2.sources.r1.bind = linux02
a2.sources.r1.port = 44444
?
# Describe the sink
a2.sinks.k1.type = avro
a2.sinks.k1.hostname = linux02
a2.sinks.k1.port = 4141
?
# Use a channel which buffers events in memory
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100
?
# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1
3.創(chuàng)建flume3.conf
配置source用于接收flume1與flume2發(fā)送過來的數據流,最終合并后sink到控制臺。
在linux02上創(chuàng)建配置文件并打開
[root@linux02 group2]$ touch flume3.conf
[root@linux02 group2]$ vim flume3.conf
添加如下內容
# Name the components on this agent
a3.sources = r1
a3.sinks = k1
a3.channels = c1
?
# Describe/configure the source
a3.sources.r1.type = avro
a3.sources.r1.bind = linux02
a3.sources.r1.port = 4141
?
# Describe the sink
# Describe the sink
a3.sinks.k1.type = logger
?
# Describe the channel
a3.channels.c1.type = memory
a3.channels.c1.capacity = 1000
a3.channels.c1.transactionCapacity = 100
?
# Bind the source and sink to the channel
a3.sources.r1.channels = c1
a3.sinks.k1.channel = c1
4.執(zhí)行配置文件
分別開啟對應配置文件:flume3.conf,flume2.conf,flume1.conf。
[root@linux02 flume]$ bin/flume-ng agent --conf conf/ --name a3 --conf-file job/group2/flume3.conf -Dflume.root.logger=INFO,console
[root@linux02 flume]$ bin/flume-ng agent --conf conf/ --name a2 --conf-file job/group2/flume2.conf
[root@linux03 flume]$ bin/flume-ng agent --conf conf/ --name a1 --conf-file job/group3/flume-file.conf
5.在linux02上向/opt/module目錄下的group.log追加內容
[root@linux02 module]$ echo 'hello' > group.log
6.在linux03上向44444端口發(fā)送數據
[root@linux03 flume]$ telnet hadoop104?44444
7. ?檢查數據
總結
- 上一篇: Flume安装
- 下一篇: OneData建设探索之路:SaaS收银