生活随笔
收集整理的這篇文章主要介紹了
ELK学习6_Kafka-Logstash-Elasticsearch数据流操作
小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.
Logstash配置過程
在Logstash中建立input和output的條件:
[html]?view plaincopy
[hadoop@Slave1?~]$?cd?/usr/local/?? [hadoop@Slave1?local]$?cd?logstash/?? [hadoop@Slave1?logstash]$?ls?? bin???????????CONTRIBUTORS??Gemfile.jruby-1.9.lock??LICENSE?????vendor?? CHANGELOG.md??Gemfile???????lib?????????????????????NOTICE.TXT?? [hadoop@Slave1?logstash]$?mkdir?-p?conf?? [hadoop@Slave1?logstash]$?ls?? bin???????????conf??????????Gemfile?????????????????lib??????NOTICE.TXT?? CHANGELOG.md??CONTRIBUTORS??Gemfile.jruby-1.9.lock??LICENSE??vendor?? [hadoop@Slave1?logstash]$?cd?conf?? [hadoop@Slave1?conf]$?ls?? [hadoop@Slave1?conf]$?touch?kafkaInput_esOutPut.conf?? [hadoop@Slave1?conf]$?ls?? kafkaInput_esOutPut.conf?? [hadoop@Slave1?conf]$??? [hadoop@Slave1?conf]$?vim?kafkaInput_esOutPut.conf???
?
對kafkaInput_esOutPut.conf進(jìn)行編輯,本機(jī)的具體內(nèi)容如下:
[html]?view plaincopy
input?{???? kafka?{???? ??zk_connect?=>?"192.168.154.158:2181,192.168.154.159:2181,192.168.154.160:2181"???? ??group_id?=>?"test-consumer-group"???? ??topic_id?=>?"logStash"???? ??reset_beginning?=>?false?#?boolean?(optional),?default:?false???? ??consumer_threads?=>?5??#?number?(optional),?default:?1???? ??decorate_events?=>?true?#?boolean?(optional),?default:?false???? ??}???? }???? ???? filter{???? ????mutate{???? ????????????#以:號分割message內(nèi)容,分割后以數(shù)據(jù)方式顯示。???? ????????????#比如abc:efg?=>?message[0]?=?abc?message[1]=efg???? ????????split?=>?["message",","]???? ????}???? ????#第一個(gè)數(shù)據(jù)的內(nèi)容中ORA-xxxxx這種格式,則這條內(nèi)容是ora錯(cuò)誤。添加二個(gè)字段???? ????mutate{???? ????????add_field?=>???{???? ????????????????"source_Ip"?=>?"%{[message][0]}"???? ????????????????"source_Port"?=>?"%{[message][1]}"???? ????????????????"dest_Ip"?=>?"%{[message][2]}"???? ????????????????"dest_Port"?=>?"%{[message][3]}"???? ????????????????}???? ????}???? }???? ???? output?{???? ?????elasticsearch?{???? ????????? ????????host?=>?"localhost"?????????? ????}???? }????
分別ssh鏈接Slave2和Slave3,將kafkaInput_esOutPut.conf拷貝到這兩臺機(jī)器上:
創(chuàng)建conf目錄過程:
[html]?view plaincopy
[hadoop@Slave1?conf]$?ssh?Slave2?? Last?login:?Wed?Oct?14?10:58:06?2015?from?slave1?? [hadoop@Slave2?~]$?cd?/usr/local/logstash/?? [hadoop@Slave2?logstash]$?mkdir?-p?conf?? [hadoop@Slave2?logstash]$?ls?? bin???????????conf??????????Gemfile?????????????????lib??????NOTICE.TXT?? CHANGELOG.md??CONTRIBUTORS??Gemfile.jruby-1.9.lock??LICENSE??vendor?? [hadoop@Slave2?logstash]$?exit?? logout?? Connection?to?Slave2?closed.?? [hadoop@Slave1?conf]$?ssh?Slave3?? Last?login:?Wed?Oct?14?10:59:01?2015?from?slave2?? [hadoop@Slave3?~]$?cd?/usr/local/logstash/?? [hadoop@Slave3?logstash]$?mkdir?-p?conf?? [hadoop@Slave3?logstash]$?ls?? bin???????????conf??????????Gemfile?????????????????lib??????NOTICE.TXT?? CHANGELOG.md??CONTRIBUTORS??Gemfile.jruby-1.9.lock??LICENSE??vendor?? [hadoop@Slave3?logstash]$?exit?? logout?? Connection?to?Slave3?closed.??
傳輸文件過程:
[html]?view plaincopy
[hadoop@Slave1?conf]$?scp?kafkaInput_esOutPut.conf?Slave2:/usr/local/logstash/conf/?? kafkaInput_esOutPut.conf??????????????????????100%?1063?????1.0KB/s???00:00?????? [hadoop@Slave1?conf]$?scp?kafkaInput_esOutPut.conf?Slave3:/usr/local/logstash/conf/?? kafkaInput_esOutPut.conf??????????????????????100%?1063?????1.0KB/s???00:00?????? [hadoop@Slave1?conf]$?ssh?Slave2?? Last?login:?Tue?Oct?27?23:46:19?2015?from?slave1?? [hadoop@Slave2?~]$?cd?/usr/local/logstash/conf/?? [hadoop@Slave2?conf]$?ls?? kafkaInput_esOutPut.conf?? [hadoop@Slave2?conf]$???
Kafka操作過程
在三臺機(jī)器上啟動zookeeper:
關(guān)閉防火墻:
[html]?view plaincopy
[hadoop@Slave1?bin]$?su?? Password:??? [root@Slave1?bin]#?service?iptables?stop?? iptables:?Setting?chains?to?policy?ACCEPT:?filter??????????[??OK??]?? iptables:?Flushing?firewall?rules:?????????????????????????[??OK??]?? iptables:?Unloading?modules:???????????????????????????????[??OK??]?? [root@Slave1?bin]#?exit?? exit?? [hadoop@Slave1?bin]??
啟動:
[html]?view plaincopy
[hadoop@Slave1?bin]$?./zkServer.sh?start?? JMX?enabled?by?default?? Using?config:?/usr/local/zookeeper/bin/../conf/zoo.cfg?? Starting?zookeeper?...?STARTED??
在其他三臺機(jī)器上進(jìn)行相同操作后,查看結(jié)果:
[html]?view plaincopy
[hadoop@Slave1?bin]$?./zkServer.sh?status?? JMX?enabled?by?default?? Using?config:?/usr/local/zookeeper/bin/../conf/zoo.cfg?? Mode:?leader?? [hadoop@Slave1?bin]$???
在三臺機(jī)器上啟動Kafka,以Slave1為例:
[html]?view plaincopy
[hadoop@Slave1?bin]$?cd?/usr/local/kafka/?? [hadoop@Slave1?kafka]$?bin/kafka-server-start.sh?config/server.properties???
新建名為logStash的topic:
[html]?view plaincopy
[hadoop@Slave1?~]$?cd?/usr/local/kafka/?? [hadoop@Slave1?kafka]$?cd?bin?? [hadoop@Slave1?bin]$?sh?kafka-topics.sh?--create?--topic?logStash?--replication-factor?1?--partitions?1?--zookeeper?Slave1:2181?? Created?topic?"logStash".?? [hadoop@Slave1?bin]$???
啟動Logstash
在三臺機(jī)器上,進(jìn)行啟動:
[html]?view plaincopy
[hadoop@Slave1?~]$?cd?/usr/local/logstash/?? [hadoop@Slave1?logstash]$?ls?? bin???????????conf??????????Gemfile?????????????????lib??????NOTICE.TXT?? CHANGELOG.md??CONTRIBUTORS??Gemfile.jruby-1.9.lock??LICENSE??vendor?? [hadoop@Slave1?logstash]$?cd?bin?? [hadoop@Slave1?bin]$?ls?? logstash??????logstash.lib.sh??plugin.bat??rspec.bat?? logstash.bat??plugin???????????rspec???????setup.bat??
啟動過程中,顯示的內(nèi)容如下,會出現(xiàn)一些警告:
[html]?view plaincopy
[hadoop@Slave2?bin]$?./logstash?agent?-f?../conf/kafkaInput_esOutPut.conf??? log4j,?[2015-10-28T21:52:07.116]??WARN:?kafka.consumer.RangeAssignor:?No?broker?partitions?consumed?by?consumer?thread?test-consumer-group_Slave2-1446094310356-56dfbfa7-3?for?topic?logStash?? log4j,?[2015-10-28T21:52:07.118]??WARN:?kafka.consumer.RangeAssignor:?No?broker?partitions?consumed?by?consumer?thread?test-consumer-group_Slave2-1446094310356-56dfbfa7-2?for?topic?logStash?? log4j,?[2015-10-28T21:52:07.119]??WARN:?kafka.consumer.RangeAssignor:?No?broker?partitions?consumed?by?consumer?thread?test-consumer-group_Slave2-1446094310356-56dfbfa7-0?for?topic?logStash?? log4j,?[2015-10-28T21:52:07.119]??WARN:?kafka.consumer.RangeAssignor:?No?broker?partitions?consumed?by?consumer?thread?test-consumer-group_Slave2-1446094310356-56dfbfa7-4?for?topic?logStash?? log4j,?[2015-10-28T21:52:07.120]??WARN:?kafka.consumer.RangeAssignor:?No?broker?partitions?consumed?by?consumer?thread?test-consumer-group_Slave2-1446094310356-56dfbfa7-1?for?topic?logStash?? log4j,?[2015-10-28T21:52:33.934]??WARN:?org.elasticsearch.bootstrap:?JNA?not?found.?native?methods?will?be?disabled.?? log4j,?[2015-10-28T21:53:09.347]??WARN:?org.elasticsearch.discovery:?[logstash-Slave2-4244-11624]?waited?for?30s?and?no?initial?state?was?set?by?the?discovery?? log4j,?[2015-10-28T21:53:35.632]??WARN:?kafka.consumer.RangeAssignor:?No?broker?partitions?consumed?by?consumer?thread?test-consumer-group_Slave2-1446094310356-56dfbfa7-3?for?topic?logStash?? log4j,?[2015-10-28T21:53:35.633]??WARN:?kafka.consumer.RangeAssignor:?No?broker?partitions?consumed?by?consumer?thread?test-consumer-group_Slave2-1446094310356-56dfbfa7-2?for?topic?logStash?? log4j,?[2015-10-28T21:53:35.634]??WARN:?kafka.consumer.RangeAssignor:?No?broker?partitions?consumed?by?consumer?thread?test-consumer-group_Slave2-1446094310356-56dfbfa7-0?for?topic?logStash?? log4j,?[2015-10-28T21:53:35.634]??WARN:?kafka.consumer.RangeAssignor:?No?broker?partitions?consumed?by?consumer?thread?test-consumer-group_Slave2-1446094310356-56dfbfa7-4?for?topic?logStash?? log4j,?[2015-10-28T21:53:35.634]??WARN:?kafka.consumer.RangeAssignor:?No?broker?partitions?consumed?by?consumer?thread?test-consumer-group_Slave2-1446094310356-56dfbfa7-1?for?topic?logStash?? Failed?to?install?template:?waited?for?[30s]?{:level=>:error}?? Logstash?startup?completed??
發(fā)送并接收數(shù)據(jù)
啟動剛才建立的topic:
[html]?view plaincopy
[hadoop@Slave1?~]$?cd?/usr/local/kafka/?? [hadoop@Slave1?kafka]$?ls?? bin??config??libs??LICENSE??logs??NOTICE?? [hadoop@Slave1?kafka]$?bin/kafka-console-producer.sh?--broker-list?Slave1:9092?--topic?logStash??
啟動ES:
[html]?view plaincopy
[hadoop@Slave1?~]$?cd?/usr/local/elasticsearch/?? [hadoop@Slave1?elasticsearch]$?bin/elasticsearch?-f?? getopt:?invalid?option?--?'f'?? [2015-10-29?00:47:27,084][INFO?][node?????????????????????]?[Clown]?version[1.7.3],?pid[5208],?build[05d4530/2015-10-15T09:14:17Z]?? [2015-10-29?00:47:27,131][INFO?][node?????????????????????]?[Clown]?initializing?...?? [2015-10-29?00:47:27,920][INFO?][plugins??????????????????]?[Clown]?loaded?[],?sites?[]?? [2015-10-29?00:47:28,548][INFO?][env??????????????????????]?[Clown]?using?[1]?data?paths,?mounts?[[/?(/dev/sda2)]],?net?usable_space?[9.7gb],?net?total_space?[17.4gb],?types?[ext4]?? [2015-10-29?00:47:43,711][INFO?][node?????????????????????]?[Clown]?initialized?? [2015-10-29?00:47:43,729][INFO?][node?????????????????????]?[Clown]?starting?...?? [2015-10-29?00:47:46,089][INFO?][transport????????????????]?[Clown]?bound_address?{inet[/0:0:0:0:0:0:0:0:9301]},?publish_address?{inet[/192.168.154.158:9301]}?? [2015-10-29?00:47:46,606][INFO?][discovery????????????????]?[Clown]?elasticsearch/v-jkBhkxSheape14hvMAHw?? [2015-10-29?00:47:50,712][INFO?][cluster.service??????????]?[Clown]?new_master?[Clown][v-jkBhkxSheape14hvMAHw][Slave1][inet[/192.168.154.158:9301]],?reason:?zen-disco-join?(elected_as_master)?? [2015-10-29?00:47:50,985][INFO?][http?????????????????????]?[Clown]?bound_address?{inet[/0:0:0:0:0:0:0:0:9200]},?publish_address?{inet[/192.168.154.158:9200]}?? [2015-10-29?00:47:50,986][INFO?][node?????????????????????]?[Clown]?started?? [2015-10-29?00:47:51,345][INFO?][gateway??????????????????]?[Clown]?recovered?[0]?indices?into?cluster_state?? [2015-10-29?00:47:51,346][INFO?][cluster.service??????????]?[Clown]?added?{[logstash-Slave1-4083-11624][loTUXdCXRVC_WzqzhD3PWg][Slave1][inet[/192.168.154.158:9300]]{data=false,?client=true},},?reason:?zen-disco-receive(join?from?node[[logstash-Slave1-4083-11624][loTUXdCXRVC_WzqzhD3PWg][Slave1][inet[/192.168.154.158:9300]]{data=false,?client=true}])?? [2015-10-29?00:47:54,185][INFO?][cluster.metadata?????????]?[Clown]?[logstash-2015.10.29]?creating?index,?cause?[auto(bulk?api)],?templates?[],?shards?[5]/[1],?mappings?[logs]?? [2015-10-29?00:47:56,201][INFO?][cluster.metadata?????????]?[Clown]?[logstash-2015.10.29]?update_mapping?[logs]?(dynamic)?? [2015-10-29?00:47:57,166][INFO?][cluster.metadata?????????]?[Clown]?[logstash-2015.10.29]?update_mapping?[logs]?(dynamic)??
檢查ES是否啟動成功:
[html]?view plaincopy
[hadoop@Slave1?~]$?curl?-X?GET?http://localhost:9200?? {?? ??"status"?:?200,?? ??"name"?:?"Clown",?? ??"cluster_name"?:?"elasticsearch",?? ??"version"?:?{?? ????"number"?:?"1.7.3",?? ????"build_hash"?:?"05d4530971ef0ea46d0f4fa6ee64dbc8df659682",?? ????"build_timestamp"?:?"2015-10-15T09:14:17Z",?? ????"build_snapshot"?:?false,?? ????"lucene_version"?:?"4.10.4"?? ??},?? ??"tagline"?:?"You?Know,?for?Search"?? }?? [hadoop@Slave1?~]$???
?
在剛才啟動的topic里發(fā)送數(shù)據(jù):
(數(shù)據(jù)的格式是源IP,源端口,目的IP,目的端口;為了簡便,發(fā)送1,1,1,1)
[html]?view plaincopy
[hadoop@Slave1?kafka]$?bin/kafka-console-producer.sh?--broker-list?Slave1:9092?--topic?logStash?? [2015-10-29?00:39:33,085]?WARN?Property?topic?is?not?valid?(kafka.utils.VerifiableProperties)?? 1,1,1,1??
查看接收的數(shù)據(jù):
[html]?view plaincopy
[hadoop@Slave1?~]$?curl?-XGET?'localhost:9200/logstash-2015.10.27/_search'?? {"error":"IndexMissingException[[logstash-2015.10.27]?missing]","status":404}[hadoop@Slave1?~]$?curl?-XGET?'localhost:9200/logstash-2015.10.29/_search'?? {"took":260,"timed_out":false,"_shards":{"total":5,"successful":5,"failed":0},"hits":{"total":2,"max_score":1.0,"hits":[{"_index":"logstash-2015.10.29","_type":"logs","_id":"AVCykUgg6gAQTB_SuF_V","_score":1.0,"_source":{"message":["1","1","1","1"],"tags":["_jsonparsefailure"],"@version":"1","@timestamp":"2015-10-29T07:39:50.871Z","kafka":{"msg_size":7,"topic":"logStash","consumer_group":"test-consumer-group","partition":0,"key":null},"source_Ip":"1","source_Port":"1","dest_Ip":"1","dest_Port":"1"}},{"_index":"logstash-2015.10.29","_type":"logs","_id":"AVCykUGv6gAQTB_SuF_U","_score":1.0,"_source":{"message":[],"tags":["_jsonparsefailure"],"@version":"1","@timestamp":"2015-10-29T07:39:46.345Z","kafka":{"msg_size":0,"topic":"logStash","consumer_group":"test-consumer-group","partition":0,"key":null},"source_Ip":"%{[message][0]}","source_Port":"%{[message][1]}","dest_Ip":"%{[message][2]}","dest_Port":"%{[message][3]}"}}]}}[hadoop@Slave1?~]$???
[html]?view plaincopy
[hadoop@Slave1?~]$?curl?-XGET?'localhost:9200/logstash-2015.10.29/_search?pretty'?? {?? ??"took"?:?26,?? ??"timed_out"?:?false,?? ??"_shards"?:?{?? ????"total"?:?5,?? ????"successful"?:?5,?? ????"failed"?:?0?? ??},?? ??"hits"?:?{?? ????"total"?:?2,?? ????"max_score"?:?1.0,?? ????"hits"?:?[?{?? ??????"_index"?:?"logstash-2015.10.29",?? ??????"_type"?:?"logs",?? ??????"_id"?:?"AVCykUgg6gAQTB_SuF_V",?? ??????"_score"?:?1.0,?? ??????"_source":{"message":["1","1","1","1"],"tags":["_jsonparsefailure"],"@version":"1","@timestamp":"2015-10-29T07:39:50.871Z","kafka":{"msg_size":7,"topic":"logStash","consumer_group":"test-consumer-group","partition":0,"key":null},"source_Ip":"1","source_Port":"1","dest_Ip":"1","dest_Port":"1"}?? ????},?{?? ??????"_index"?:?"logstash-2015.10.29",?? ??????"_type"?:?"logs",?? ??????"_id"?:?"AVCykUGv6gAQTB_SuF_U",?? ??????"_score"?:?1.0,?? ??????"_source":{"message":[],"tags":["_jsonparsefailure"],"@version":"1","@timestamp":"2015-10-29T07:39:46.345Z","kafka":{"msg_size":0,"topic":"logStash","consumer_group":"test-consumer-group","partition":0,"key":null},"source_Ip":"%{[message][0]}","source_Port":"%{[message][1]}","dest_Ip":"%{[message][2]}","dest_Port":"%{[message][3]}"}?? ????}?]?? ??}?? }?? [hadoop@Slave1?~]$??
參考資料:
http://blog.csdn.net/xuguokun1986/article/details/49452101
對這篇博客的內(nèi)容進(jìn)行了擴(kuò)展。
來源:http://blog.csdn.net/wang_zhenwei/article/details/49493131
與50位技術(shù)專家面對面20年技術(shù)見證,附贈技術(shù)全景圖
總結(jié)
以上是生活随笔為你收集整理的ELK学习6_Kafka-Logstash-Elasticsearch数据流操作的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。