logstash通过kafka传输nginx日志(三)
單個(gè)進(jìn)程 logstash 可以實(shí)現(xiàn)對(duì)數(shù)據(jù)的讀取、解析和輸出處理。但是在生產(chǎn)環(huán)境中,從每臺(tái)應(yīng)用服務(wù)器運(yùn)行 logstash 進(jìn)程并將數(shù)據(jù)直接發(fā)送到 Elasticsearch 里,顯然不是第一選擇:第一,過(guò)多的客戶端連接對(duì) Elasticsearch 是一種額外的壓力;第二,網(wǎng)絡(luò)抖動(dòng)會(huì)影響到 logstash 進(jìn)程,進(jìn)而影響生產(chǎn)應(yīng)用;第三,運(yùn)維人員未必愿意在生產(chǎn)服務(wù)器上部署 Java,或者讓 logstash 跟業(yè)務(wù)代碼爭(zhēng)奪 Java 資源。
所以,在實(shí)際運(yùn)用中,logstash 進(jìn)程會(huì)被分為兩個(gè)不同的角色。運(yùn)行在應(yīng)用服務(wù)器上的,盡量減輕運(yùn)行壓力,只做讀取和轉(zhuǎn)發(fā),這個(gè)角色叫做 shipper;運(yùn)行在獨(dú)立服務(wù)器上,完成數(shù)據(jù)解析處理,負(fù)責(zé)寫(xiě)入 Elasticsearch 的角色,叫 indexer。
?
?
Kafka 是一個(gè)高吞吐量的分布式發(fā)布訂閱日志服務(wù),具有高可用、高性能、分布式、高擴(kuò)展、持久性等特性。和Redis做輕量級(jí)消息隊(duì)列不同,Kafka利用磁盤(pán)做消息隊(duì)列,所以也就無(wú)所謂消息緩沖時(shí)的磁盤(pán)問(wèn)題。生產(chǎn)環(huán)境中還是推薦使用Kafka做消息隊(duì)列。此外,如果公司內(nèi)部已經(jīng)有 Kafka 服務(wù)在運(yùn)行,logstash也可以快速接入,免去重復(fù)建設(shè)的麻煩。
一、Logstash搭建
詳細(xì)搭建可以參考Logstash安裝搭建(一)。
二、配置Shipper
Shipper 即為Nginx服務(wù)器上運(yùn)行的 logstash 進(jìn)程,logstash 通過(guò)?logstash-input-file 寫(xiě)入,然后通過(guò)?logstash-output-kafka 插件將日志寫(xiě)入到 kafka 集群中。
Logstash使用一個(gè)名叫 FileWatch 的 Ruby Gem 庫(kù)來(lái)監(jiān)聽(tīng)文件變化。這個(gè)庫(kù)支持 glob 展開(kāi)文件路徑,而且會(huì)記錄在 .sincedb 的數(shù)據(jù)庫(kù)文件來(lái)跟蹤被監(jiān)聽(tīng)的日志文件的當(dāng)前讀取位置。
.sincedb 文件中記錄了每個(gè)被監(jiān)聽(tīng)的文件的 inode, major number, minor number 和 pos。?Input配置實(shí)例
input {file {path => "/var/log/nginx/log_access.log"type => "nginx-access"discover_interval => 15 #logstash 每隔多久去檢查一次被監(jiān)聽(tīng)的 path 下是否有新文件。默認(rèn)值是 15 秒。sincedb_path => "/etc/logstash/.sincedb" #定義sincedb文件的位置start_position => "beginning" #定義文件讀取的位置 } }? 其他配置詳解:
exclude 不想被監(jiān)聽(tīng)的文件可以排除出去。 close_older 已經(jīng)監(jiān)聽(tīng)的文件,若超過(guò)這個(gè)時(shí)間內(nèi)沒(méi)有更新,就關(guān)閉監(jiān)聽(tīng)該文件的句柄。默認(rèn)為:3600s,即一小時(shí)。 ignore_older 在每次檢查文件列表時(shí),若文件的最后修改時(shí)間超過(guò)該值,則忽略該文件。默認(rèn)為:86400s,即一天。 sincedb_path 定義 .sincedb 文件路徑,默認(rèn)為 $HOME/.sincedb 。 sincedb_write_interval 間隔多久寫(xiě)一次sincedb文件,默認(rèn)15s。 stat_interval 每隔多久檢查被監(jiān)聽(tīng)文件狀態(tài)(是否有更新),默認(rèn)為1s。 start_position logstash從什么位置開(kāi)始讀取文件數(shù)據(jù)。默認(rèn)為結(jié)束位置,類似 tail -f 的形式。設(shè)置為“beginning”,則從頭讀取,類似 cat ,到最后一行以后變成為 tail -f 。Output配置實(shí)例
以下配置可以實(shí)現(xiàn)對(duì) kafka producer 的基本使用。生產(chǎn)者更多詳細(xì)配置請(qǐng)查看 Kafka 官方文檔中生產(chǎn)者部分配置文檔。
output {kafka {bootstrap_servers => "localhost:9092" #生產(chǎn)者topic_id => "nginx-access-log" #設(shè)置寫(xiě)入kafka的topiccompression_type => "snappy" #消息壓縮模式,默認(rèn)是none,可選gzip、snappy。 } }logstash-out-kafka 其他配置詳解:
compression_type 消息壓縮模式,默認(rèn)是none,有效值為:none、gzip、snappy。 asks 消息確認(rèn)模式,默認(rèn)為1,有效值為:0、1、all。設(shè)置為0,生產(chǎn)者不等待 broker 回應(yīng);設(shè)置為1,生產(chǎn)者會(huì)收到 leader 寫(xiě)入之后的回應(yīng);設(shè)置為all, leader 將要等待 in-sync 中所有的 replication 同步確認(rèn)。 send_buffer_bytes TCP發(fā)送數(shù)據(jù)時(shí)的緩沖區(qū)的大小。logstash-kafka 插件輸入和輸出默認(rèn) codec 為 json 格式。在輸入和輸出的時(shí)候注意下編碼格式。消息傳遞過(guò)程中 logstash 默認(rèn)會(huì)為消息編碼內(nèi)加入相應(yīng)的時(shí)間戳和 hostname 等信息。如果不想要以上信息(一般做消息轉(zhuǎn)發(fā)的情況下),可以使用以下配置,例如:
output {kafka {codec => plain {format => "%{message}"}} }三、搭建配置Kafka
? 搭建配置Kafka可以參考?Kafka集群搭建。
四、配置Indexer
是用logstash-input-kafka插件,從kafka集群中讀取數(shù)據(jù)。
Input配置示例:
input {kafka {zk_connect => "localhost:2181" #zookeeper地址topic_id => "nginx-access-log" #kafka中topic名稱,記得創(chuàng)建該topicgroup_id => "nginx-access-log" #默認(rèn)為“l(fā)ogstash”codec => "plain" #與Shipper端output配置項(xiàng)一致consumer_threads => 1 #消費(fèi)的線程數(shù)decorate_events => true #在輸出消息的時(shí)候回輸出自身的信息,包括:消費(fèi)消息的大小、topic來(lái)源以及consumer的group信息。type => "nginx-access-log" } }更多 logstash-input-kafka 配置可以從 logstash 官方文檔?查看。
Logstash 是一個(gè) input | decode | filter | encode | output 的數(shù)據(jù)流。上述配置中有?codec => "plain"?,即logstash 采用轉(zhuǎn)發(fā)的形式,不會(huì)對(duì)原有信息進(jìn)行編碼轉(zhuǎn)換。豐富的過(guò)濾器插件(Filter)的存在是 logstash 威力強(qiáng)大的重要因素,提供的不單單是過(guò)濾的功能,可以進(jìn)行復(fù)雜的邏輯處理,甚至無(wú)中生有添加新的logstash事件到后續(xù)的流程中去。這里只列舉?logstash-output-elasticsearch 配置。
logstash-output-elasticsearch 配置實(shí)例:
output {elasticsearch {hosts => ["localhost:9200"] //Elasticsearch 地址,多個(gè)地址以逗號(hào)分隔。 index => "logstash-%{type}-%{+YYYY.MM.dd}" //索引命名方式,不支持大寫(xiě)字母(Logstash除外) document_type => "%{type}" //文檔類型 workers => 1flush_size => 20000 //向Elasticsearch批量發(fā)送數(shù)據(jù)的條數(shù) idle_flush_time => 10 //向Elasticsearch批量發(fā)送數(shù)據(jù)的時(shí)間間隔,即使不滿足?flush_size?也會(huì)發(fā)送 template_overwrite => true //設(shè)置為true,將會(huì)把自定義的模板覆蓋logstash自帶模板 } }?
到此就已經(jīng)把Nginx上的日志轉(zhuǎn)發(fā)到Elasticsearch中。
?
轉(zhuǎn)載于:https://www.cnblogs.com/Orgliny/p/5730381.html
總結(jié)
以上是生活随笔為你收集整理的logstash通过kafka传输nginx日志(三)的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: http://acm.zzuli.edu
- 下一篇: vagrant boxes