ElasticSearch从入门到精通:Logstash妙用
hello,大家好,我是 Jackpop,碩士畢業(yè)于哈爾濱工業(yè)大學(xué),曾在華為、阿里等大廠工作,如果你對(duì)升學(xué)、就業(yè)、技術(shù)提升等有疑惑,不妨交個(gè)朋友:
我是Jackpop,我們交個(gè)朋友吧!
在本系列文章的第3部分關(guān)于實(shí)時(shí)流處理的文章中,我們學(xué)習(xí)了如何使用ElasticSearch的批量API以及利用REST API將.json航班數(shù)據(jù)文件導(dǎo)入ElasticSearch。
在這篇文章中,我們將介紹另一種方式,Logstash。
Logstash介紹
Logstash是一個(gè)開(kāi)源的數(shù)據(jù)收集引擎,具有實(shí)時(shí)流水線功能。
它從多個(gè)源頭接收數(shù)據(jù),進(jìn)行數(shù)據(jù)處理,然后將轉(zhuǎn)化后的信息發(fā)送到stash,即存儲(chǔ)。
Logstash允許我們將任何格式的數(shù)據(jù)導(dǎo)入到任何數(shù)據(jù)存儲(chǔ)中,不僅僅是ElasticSearch。
它可以用來(lái)將數(shù)據(jù)并行導(dǎo)入到其他NoSQL數(shù)據(jù)庫(kù),如MongoDB或Hadoop,甚至導(dǎo)入到AWS。
數(shù)據(jù)可以存儲(chǔ)在文件中,也可以通過(guò)流等方式進(jìn)行傳遞。
Logstash對(duì)數(shù)據(jù)進(jìn)行解析、轉(zhuǎn)換和過(guò)濾。它還可以從非結(jié)構(gòu)化數(shù)據(jù)中推導(dǎo)出結(jié)構(gòu),對(duì)個(gè)人數(shù)據(jù)進(jìn)行匿名處理,可以進(jìn)行地理位置查詢等等。
一個(gè)Logstash管道有兩個(gè)必要的元素,輸入和輸出,以及一個(gè)可選的元素,過(guò)濾器。
輸入組件從源頭消耗數(shù)據(jù),過(guò)濾組件轉(zhuǎn)換數(shù)據(jù),輸出組件將數(shù)據(jù)寫(xiě)入一個(gè)或多個(gè)目的地。
所以,我們的示例場(chǎng)景的Logstash架構(gòu)基本如下。
我們從.json文件中讀取我們的航班數(shù)據(jù),我們對(duì)它們進(jìn)行處理/轉(zhuǎn)換,應(yīng)用一些過(guò)濾器并將它們存儲(chǔ)到ElasticSearch中。
Logstash安裝
有幾種選擇來(lái)安裝Logstash。
一種是訪問(wèn)網(wǎng)站下載你平臺(tái)的存檔,然后解壓到一個(gè)文件夾。
你也可以使用你的平臺(tái)的包管理器來(lái)安裝,比如yum、apt-get或homebrew,或者作為docker鏡像來(lái)安裝。
確保你已經(jīng)定義了一個(gè)環(huán)境變量JAVA_HOME,指向JDK 8或11或14的安裝(Logstash自帶嵌入式AdoptJDK)。
Logstash工作流
一旦你安裝了它,讓我們通過(guò)運(yùn)行最基本的Logstash工作流來(lái)測(cè)試你的Logstash安裝情況。
bin/logstash -e 'input { stdin { } } output { stdout {} }'上面的工作流接受來(lái)自stdin(即你的鍵盤(pán))的輸入,并將其輸出到stdout(即你的屏幕)。
上面的工作流中沒(méi)有定義任何過(guò)濾器。一旦你看到logstash被成功啟動(dòng)的消息,輸入一些東西(我輸入的是Hello world),按ENTER鍵,你應(yīng)該看到產(chǎn)生的消息的結(jié)構(gòu)格式,像下面這樣。
[2021-02-11T21:52:57,120][INFO ][logstash.agent ] Successfully started Logstash API endpoint {:port=>9600} Hello world {"message" => "Hello world","@version" => "1","@timestamp" => 2021-02-11T19:57:46.208Z,"host" => "MacBook-Pro.local" }然而,通常Logstash是通過(guò)配置文件來(lái)工作的,配置文件告訴它該做什么,即在哪里找到它的輸入,如何轉(zhuǎn)換它,在哪里存儲(chǔ)它。Logstash配置文件的結(jié)構(gòu)基本上包括三個(gè)部分:輸入、過(guò)濾和輸出。
你在輸入部分指定數(shù)據(jù)的來(lái)源,在輸出部分指定目的地。在過(guò)濾器部分,你可以使用支持的過(guò)濾器插件來(lái)操作、測(cè)量和創(chuàng)建事件。
配置文件的結(jié)構(gòu)如下面的代碼示例所示。
input {...} filter {...} output{...}你需要?jiǎng)?chuàng)建一個(gè)配置文件,指定你要使用的組件和每個(gè)組件的設(shè)置。在config文件夾中已經(jīng)存在一個(gè)配置文件樣本,logstash-sample.conf。
其內(nèi)容如下所示。
# Sample Logstash configuration for creating a simple # Beats -> Logstash -> Elasticsearch pipeline.input {beats {port => 5044} }output {elasticsearch {hosts => ["http://localhost:9200"]index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}"#user => "elastic"#password => "changeme"} }這里input部分定義了Logstash應(yīng)該從哪里獲取數(shù)據(jù)。這里有一個(gè)可用的輸入插件列表。
我們的輸入不是來(lái)自Beats組件,而是來(lái)自文件系統(tǒng),所以我們使用文件輸入組件。
input {file {start_position => "beginning"path => "/usr/local/Cellar/logstash-full/7.11.0/data/flightdata/test.json"codec => "json"} }我們使用start_position參數(shù)來(lái)告訴插件從頭開(kāi)始讀取文件。
需要注意,數(shù)據(jù)路徑必須是絕對(duì)的。
我們使用的是json編解碼器,除了json,還可以使用純文本形式。
在下載的數(shù)據(jù)中,可以找到一個(gè)名為test.json的文件。它只由2條航班數(shù)據(jù)組成的文件。
輸出塊定義了Logstash應(yīng)該在哪里存儲(chǔ)數(shù)據(jù)。我們將使用ElasticSearch來(lái)存儲(chǔ)我們的數(shù)據(jù)。
我們添加了第二個(gè)輸出作為我們的控制臺(tái),并使用rubydebugger格式化輸出,第三個(gè)輸出作為文件系統(tǒng),最后兩個(gè)用于測(cè)試我們的輸出。 我們將輸出存儲(chǔ)在output.json中。
output {elasticsearch {hosts => ["http://localhost:9200"]index => "testflight"}file {path => "/usr/local/Cellar/logstash-full/7.11.0/data/output.json"}stdout {codec => rubydebug} }此外,還可以定義過(guò)濾器來(lái)對(duì)數(shù)據(jù)進(jìn)行轉(zhuǎn)換。
Logstash提供了大量的過(guò)濾器,下面介紹一些非常常用的的過(guò)濾器:
- grok:解析任何任意文本并添加結(jié)構(gòu),它包含120種內(nèi)置模式
- mutate:對(duì)字段進(jìn)行一般的轉(zhuǎn)換,例如重命名、刪除、替換和修改字段
- drop:丟棄一個(gè)數(shù)據(jù)
- clone:復(fù)制一個(gè)數(shù)據(jù),可能增加或刪除字段
- geoip:添加IP地址的地理位置信息
- split:將多行消息、字符串或數(shù)組分割成不同的數(shù)據(jù)
可以通過(guò)執(zhí)行下方命令查看 Logstash 安裝中安裝的全部插件列表。
$ bin/logstash-plugin list你會(huì)注意到,有一個(gè)JSON過(guò)濾器插件。這個(gè)插件可以解析.json文件并創(chuàng)建相應(yīng)的JSON數(shù)據(jù)結(jié)構(gòu)。
正確地選擇和配置過(guò)濾器是非常重要的,否則,你最終的輸出中沒(méi)有數(shù)據(jù)。
所以,在我們的過(guò)濾塊中,我們啟用json插件,并告訴它我們的數(shù)據(jù)在消息字段中。
filter {json {source => "message"} }到此為止,完成的配置文件config/testflight.conf內(nèi)容如下:
input {file {start_position => "beginning"path => "/usr/local/Cellar/logstash-full/7.11.0/data/flightdata/test.json"codec => "json"} }filter {json {source => "message"} }output { # elasticsearch { # hosts => ["http://localhost:9200/"] # index => "testflight" # }file {path => "/usr/local/Cellar/logstash-full/7.11.0/data/output.json"}stdout {codec => rubydebug} }你可以通過(guò)如下命令進(jìn)行一下測(cè)試:
bin/logstash -f config/testflight.conf --config.test_and_exit ... Configuration OK [2021-02-11T23:15:38,997][INFO ][logstash.runner ] Using config.test_and_exit mode. Config Validation Result: OK. Exiting Logstash如果配置文件通過(guò)了配置測(cè)試,用以下命令啟動(dòng)Logstash。
bin/logstash -f config/testflight.conf --config.reload.automatic ...–config.reload.automatic配置選項(xiàng)可以實(shí)現(xiàn)自動(dòng)重載配置,這樣你就不必每次修改配置文件時(shí)都要停止并重新啟動(dòng)Logstash。
如果一切順利,你應(yīng)該會(huì)看到如下的輸出結(jié)果。
{"CMsgs" => 1,"@version" => "1","PosTime" => 1467378028852,"Rcvr" => 1,"EngMount" => 0,"Tisb" => false,"Mil" => false,"Trt" => 2,"Icao" => "A0835D","Long" => -82.925616,"InHg" => 29.9409447,"VsiT" => 1,"ResetTrail" => true,"CallSus" => false,"@timestamp" => 2021-02-14T18:32:16.337Z,"host" => "MacBook-Pro.local","OpIcao" => "RPA","Man" => "Embraer","GAlt" => 2421,"TT" => "a","Bad" => false,"HasSig" => true,"TSecs" => 1,"Vsi" => 2176,"EngType" => 3,"Reg" => "N132HQ","Alt" => 2400,"Species" => 1,"FlightsCount" => 0,"WTC" => 2,"Cos" => [[0] 39.984322,[1] -82.925616,[2] 1467378028852.0,[3] nil],"message" => "{"Id":10519389,"Rcvr":1,"HasSig":true,"Sig":0,"Icao":"A0835D","Bad":false,"Reg":"N132HQ","FSeen":"\/Date(1467378028852)\/","TSecs":1,"CMsgs":1,"Alt":2400,"GAlt":2421,"InHg":29.9409447,"AltT":0,"Lat":39.984322,"Long":-82.925616,"PosTime":1467378028852,"Mlat":true,"Tisb":false,"Spd":135.8,"Trak":223.2,"TrkH":false,"Type":"E170","Mdl":"2008 EMBRAER-EMPRESA BRASILEIRA DE ERJ 170-200 LR","Man":"Embraer","CNum":"17000216","Op":"REPUBLIC AIRLINE INC - INDIANAPOLIS, IN","OpIcao":"RPA","Sqk":"","Vsi":2176,"VsiT":1,"WTC":2,"Species":1,"Engines":"2","EngType":3,"EngMount":0,"Mil":false,"Cou":"United States","HasPic":false,"Interested":false,"FlightsCount":0,"Gnd":false,"SpdTyp":0,"CallSus":false,"ResetTrail":true,"TT":"a","Trt":2,"Year":"2008","Cos":[39.984322,-82.925616,1467378028852.0,null]}","Lat" => 39.984322,"TrkH" => false,"Op" => "REPUBLIC AIRLINE INC - INDIANAPOLIS, IN","Engines" => "2","Sqk" => "","Id" => 10519389,"Gnd" => false,"CNum" => "17000216","path" => "/usr/local/Cellar/logstash-full/7.11.0/data/flightdata/test.json","Cou" => "United States","HasPic" => false,"FSeen" => "/Date(1467378028852)/","Interested" => false,"Mdl" => "2008 EMBRAER-EMPRESA BRASILEIRA DE ERJ 170-200 LR","Spd" => 135.8,"Sig" => 0,"Trak" => 223.2,"Year" => "2008","SpdTyp" => 0,"AltT" => 0,"Type" => "E170","Mlat" => true }數(shù)據(jù)轉(zhuǎn)換
首先,讓我們從輸出中刪除path, @version, @timestamp, host和message,這些都是logstash添加的。
filter {json {source => "message"}mutate {remove_field => ["path", "@version", "@timestamp", "host", "message"]} }mutate過(guò)濾器組件可以刪除不需要的字段。
重新運(yùn)行:
bin/logstash -f config/flightdata-logstash.conf –-config.test_and_exit bin/logstash -f config/flightdata-logstash.conf --config.reload.automatic接下來(lái),我們將_id設(shè)置為Id。
output {elasticsearch {hosts => ["http://localhost:9200"]index => "testflight"document_id => "%{Id}"}我們?cè)谳敵鼋M件中通過(guò)設(shè)置document_id來(lái)實(shí)現(xiàn)。
然而,如果你重新運(yùn)行l(wèi)ogstash,你會(huì)發(fā)現(xiàn)Id字段仍然存在。
有一個(gè)竅門(mén),在過(guò)濾插件中把它改名為[@metadata][Id],然后在輸出中使用,@metadata字段被自動(dòng)刪除。
filter {json {source => "message"}mutate {remove_field => ["path", "@version", "@timestamp", "host", "message"]rename => { "[Id]" => "[@metadata][Id]" }} }output {elasticsearch {hosts => ["http://localhost:9200"]index => "flight-logstash"document_id => "%{[@metadata][Id]}"} ...現(xiàn)在讓我們嘗試解析日期。如果你還記得,這是我們?cè)谏弦黄恼轮袥](méi)有做的事情,我們需要將日期轉(zhuǎn)換為更適合人們熟悉的格式。
例如:
"FSeen" => "/Date(1467378028852)/"需要將時(shí)間1467378028852轉(zhuǎn)化成容易閱讀的格式,并且去掉前后多余的字符串,通過(guò)gsub組件可以實(shí)現(xiàn)這項(xiàng)功能:
gsub => [# get rid of /Date("FSeen", "/Date(", "",# get rid of )/"FSeen", ")/", ""]這里通過(guò)gsub去掉了數(shù)據(jù)中/Date()\等多余部分,輸出結(jié)果為:
"FSeen" : "1467378028852"然后把時(shí)間戳轉(zhuǎn)換成熟悉的格式:
date {timezone => "UTC"match => ["FSeen", "UNIX_MS"]target => "FSeen" }UNIX_MS是UNIX時(shí)間戳,單位是毫秒。我們匹配字段FSeen并將結(jié)果存儲(chǔ)在同一字段中,輸出結(jié)果為:
"FSeen" : "2016-07-01T13:00:28.852Z",上述轉(zhuǎn)換的完整代碼如下:
mutate {gsub => [# get rid of /Date("FSeen", "/Date(", "",# get rid of )/"FSeen", ")/", ""] } date {timezone => "UTC"match => ["FSeen", "UNIX_MS"]target => "FSeen" }在這部分中,我們學(xué)習(xí)了如何使用Logstash將.json航班數(shù)據(jù)批量文件導(dǎo)入到ElasticSearch中。Logstash是一個(gè)非常方便的方式,它有很多過(guò)濾器,支持很多數(shù)據(jù)類型,你只需要學(xué)習(xí)如何編寫(xiě)一個(gè)配置文件就可以了!
Logstash是否適合實(shí)時(shí)數(shù)據(jù)處理?
答案是:要看情況
Logstash主要是為批處理數(shù)據(jù)而設(shè)計(jì)的,比如日志數(shù)據(jù),也許不適合處理來(lái)自傳感器的實(shí)時(shí)航班數(shù)據(jù)。
不過(guò),你可以參考一些參考資料,這些資料描述了如何創(chuàng)建可以擴(kuò)展的Logstash部署,并使用Redis作為L(zhǎng)ogstash代理和Logstash中央服務(wù)器之間的中介,以便處理許多事件并實(shí)時(shí)處理它們。
總結(jié)
以上是生活随笔為你收集整理的ElasticSearch从入门到精通:Logstash妙用的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: HTML5 新标签总汇
- 下一篇: 服务器怎么用光驱装系统教程,使用光驱重装