Hadoop生态Flume(三)拦截器(Interceptor)介绍与使用(1)
轉(zhuǎn)載自?Flume中的攔截器(Interceptor)介紹與使用(一)
Flume中的攔截器(interceptor)
用戶Source讀取events發(fā)送到Sink的時(shí)候,在events header中加入一些有用的信息,或者對(duì)events的內(nèi)容進(jìn)行過濾,完成初步的數(shù)據(jù)清洗。這在實(shí)際業(yè)務(wù)場(chǎng)景中非常有用.
Flume-ng 1.6中目前提供了以下攔截器:
Timestamp Interceptor; Host Interceptor; Static Interceptor; UUID Interceptor; Morphline Interceptor; Search and Replace Interceptor; Regex Filtering Interceptor; Regex Extractor Interceptor;本文對(duì)常用的幾種攔截器進(jìn)行學(xué)習(xí)和介紹,并附上使用示例。
本文中使用的Source為TaildirSource,就是監(jiān)控一個(gè)文件的變化,將內(nèi)容發(fā)送給Sink,具體可參考《Flume中的TaildirSource》.
Source配置如下:
#-->設(shè)置sources名稱 agent_lxw1234.sources = sources1 #--> 設(shè)置channel名稱 agent_lxw1234.channels = fileChannel #--> 設(shè)置sink 名稱 agent_lxw1234.sinks = sink1# source 配置 agent_lxw1234.sources.sources1.type = com.lxw1234.flume17.TaildirSource agent_lxw1234.sources.sources1.positionFile = /tmp/flume/agent_lxw1234_position.json agent_lxw1234.sources.sources1.filegroups = f1 agent_lxw1234.sources.sources1.filegroups.f1 = /tmp/lxw1234_.*.log agent_lxw1234.sources.sources1.batchSize = 100 agent_lxw1234.sources.sources1.backoffSleepIncrement = 1000 agent_lxw1234.sources.sources1.maxBackoffSleep = 5000 agent_lxw1234.sources.sources1.channels = fileChannelFlume Source中使用攔截器的相關(guān)配置如下:
## source 攔截器 agent_lxw1234.sources.sources1.interceptors = i1 i2 agent_lxw1234.sources.sources1.interceptors.i1.type = host agent_lxw1234.sources.sources1.interceptors.i1.useIP = false agent_lxw1234.sources.sources1.interceptors.i1.hostHeader = agentHost agent_lxw1234.sources.sources1.interceptors.i2.type = timestamp對(duì)一個(gè)Source可以使用多個(gè)攔截器。
?
一、Timestamp Interceptor
時(shí)間戳攔截器,將當(dāng)前時(shí)間戳(毫秒)加入到events header中,key名字為:timestamp,值為當(dāng)前時(shí)間戳。用的不是很多。比如在使用HDFS Sink時(shí)候,根據(jù)events的時(shí)間戳生成結(jié)果文件,hdfs.path = hdfs://cdh5/tmp/dap/%Y%m%d
hdfs.filePrefix = log_%Y%m%d_%H
會(huì)根據(jù)時(shí)間戳將數(shù)據(jù)寫入相應(yīng)的文件中。
但可以用其他方式代替(設(shè)置useLocalTimeStamp = true)。
?
二、Host Interceptor
主機(jī)名攔截器。將運(yùn)行Flume agent的主機(jī)名或者IP地址加入到events header中,key名字為:host(也可自定義)。
根據(jù)上面的Source,攔截器的配置如下:
## source 攔截器 agent_lxw1234.sources.sources1.interceptors = i1 agent_lxw1234.sources.sources1.interceptors.i1.type = host agent_lxw1234.sources.sources1.interceptors.i1.useIP = false agent_lxw1234.sources.sources1.interceptors.i1.hostHeader = agentHost# sink 1 配置 agent_lxw1234.sinks.sink1.type = hdfs agent_lxw1234.sinks.sink1.hdfs.path = hdfs://cdh5/tmp/lxw1234/%Y%m%d agent_lxw1234.sinks.sink1.hdfs.filePrefix = lxw1234_%{agentHost} agent_lxw1234.sinks.sink1.hdfs.fileSuffix = .log agent_lxw1234.sinks.sink1.hdfs.fileType = DataStream agent_lxw1234.sinks.sink1.hdfs.useLocalTimeStamp = true agent_lxw1234.sinks.sink1.hdfs.writeFormat = Text agent_lxw1234.sinks.sink1.hdfs.rollCount = 0 agent_lxw1234.sinks.sink1.hdfs.rollSize = 0 agent_lxw1234.sinks.sink1.hdfs.rollInterval = 600 agent_lxw1234.sinks.sink1.hdfs.batchSize = 500 agent_lxw1234.sinks.sink1.hdfs.threadsPoolSize = 10 agent_lxw1234.sinks.sink1.hdfs.idleTimeout = 0 agent_lxw1234.sinks.sink1.hdfs.minBlockReplicas = 1 agent_lxw1234.sinks.sink1.channel = fileChannel該配置用于將source的events保存到HDFS上hdfs://cdh5/tmp/lxw1234的目錄下,文件名為lxw1234_<主機(jī)名>.log
?
三、Static Interceptor
靜態(tài)攔截器,用于在events header中加入一組靜態(tài)的key和value。
根據(jù)上面的Source,攔截器的配置如下:
## source 攔截器 agent_lxw1234.sources.sources1.interceptors = i1 agent_lxw1234.sources.sources1.interceptors.i1.type = static agent_lxw1234.sources.sources1.interceptors.i1.preserveExisting = true agent_lxw1234.sources.sources1.interceptors.i1.key = static_key agent_lxw1234.sources.sources1.interceptors.i1.value = static_value# sink 1 配置 agent_lxw1234.sinks.sink1.type = hdfs agent_lxw1234.sinks.sink1.hdfs.path = hdfs://cdh5/tmp/lxw1234 agent_lxw1234.sinks.sink1.hdfs.filePrefix = lxw1234_%{static_key} agent_lxw1234.sinks.sink1.hdfs.fileSuffix = .log agent_lxw1234.sinks.sink1.hdfs.fileType = DataStream agent_lxw1234.sinks.sink1.hdfs.useLocalTimeStamp = true agent_lxw1234.sinks.sink1.hdfs.writeFormat = Text agent_lxw1234.sinks.sink1.hdfs.rollCount = 0 agent_lxw1234.sinks.sink1.hdfs.rollSize = 0 agent_lxw1234.sinks.sink1.hdfs.rollInterval = 600 agent_lxw1234.sinks.sink1.hdfs.batchSize = 500 agent_lxw1234.sinks.sink1.hdfs.threadsPoolSize = 10 agent_lxw1234.sinks.sink1.hdfs.idleTimeout = 0 agent_lxw1234.sinks.sink1.hdfs.minBlockReplicas = 1 agent_lxw1234.sinks.sink1.channel = fileChannel看看最終Sink在HDFS上生成的文件結(jié)構(gòu):
?
四、UUID Interceptor
UUID攔截器,用于在每個(gè)events header中生成一個(gè)UUID字符串,例如:b5755073-77a9-43c1-8fad-b7a586fc1b97。生成的UUID可以在sink中讀取并使用。根據(jù)上面的source,攔截器的配置如下:
## source 攔截器 agent_lxw1234.sources.sources1.interceptors = i1 agent_lxw1234.sources.sources1.interceptors.i1.type = org.apache.flume.sink.solr.morphline.UUIDInterceptor$Builder agent_lxw1234.sources.sources1.interceptors.i1.headerName = uuid agent_lxw1234.sources.sources1.interceptors.i1.preserveExisting = true agent_lxw1234.sources.sources1.interceptors.i1.prefix = UUID_# sink 1 配置 agent_lxw1234.sinks.sink1.type = logger agent_lxw1234.sinks.sink1.channel = fileChannel運(yùn)行后在日志中查看header信息:
?
五、Morphline Interceptor
Morphline攔截器,該攔截器使用Morphline對(duì)每個(gè)events數(shù)據(jù)做相應(yīng)的轉(zhuǎn)換。關(guān)于Morphline的使用,可參考
http://kitesdk.org/docs/current/morphlines/morphlines-reference-guide.html
后續(xù)再研究這塊。
?
總結(jié)
以上是生活随笔為你收集整理的Hadoop生态Flume(三)拦截器(Interceptor)介绍与使用(1)的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 快速选择截图中的需要内容快速选择截图中的
- 下一篇: 再用笔记本键盘笔记本再配键盘