基于 ELK Stack 和 Spark Streaming 的日志处理平台设计与实现
概述
大數據時代,隨著數據量不斷增長,存儲與計算集群的規模也逐漸擴大,幾百上千臺的云計算環境已不鮮見。現在的集群所需要解決的問題不僅僅是高性能、高可靠性、高可擴展性,還需要面對易維護性以及數據平臺內部的數據共享性等諸多挑戰。優秀的系統運維平臺既能實現數據平臺各組件的集中式管理、方便系統運維人員日常監測、提升運維效率,又能反饋系統運行狀態給系統開發人員。例如采集數據倉庫的日志可以按照時間序列查看各數據庫實例各種級別的日志數量與占比,采集 DB2 表空間數據分析可得到數據庫集群健康狀態,分析應用服務器的日志可以查看出錯最多的模塊、下載最多的文件、使用最多的功能等。大數據時代的業務與運維將緊密的結合在一起。
日志
1. 什么是日志
日志是帶時間戳的基于時間序列的機器數據,包括 IT 系統信息(服務器、網絡設備、操作系統、應用軟件)、物聯網各種傳感器信息。日志可以反映用戶實際行為,是真實的數據。
2. 日志處理方案演進
圖 1. 日志處理方案經歷的版本迭代
- 日志處理 v1.0:日志沒有集中式處理;只做事后追查,黑客入侵后刪除日志無法察覺;使用數據庫存儲日志,無法勝任復雜事務處理。
- 日志處理 v2.0:使用 Hadoop 平臺實現日志離線批處理,缺點是實時性差;使用 Storm 流處理框架、Spark 內存計算框架處理日志,但 Hadoop/Storm/Spark 都是編程框架,并不是拿來即用的平臺。
- 日志處理 v3.0:使用日志實時搜索引擎分析日志,特點:第一是快,日志從產生到搜索分析出結果只有數秒延時;第二是大,每天處理 TB 日志量;第三是靈活,可搜索分析任何日志。作為代表的解決方案有 Splunk、ELK、SILK。
圖 2. 深度整合 ELK、Spark、Hadoop 構建日志分析系統
ELK Stack
ELK Stack 是開源日志處理平臺解決方案,背后的商業公司是 Elastic(https://www.elastic.co/)。它由日志采集解析工具 Logstash、基于 Lucene 的全文搜索引擎 Elasticsearch、分析可視化平臺 Kibana 組成。目前 ELK 的用戶有 Adobe、Microsoft、Mozilla、Facebook、Stackoverflow、Cisco、ebay、Uber 等諸多知名廠商。
1. Logstash
Logstash 是一種功能強大的信息采集工具,類似于 Hadoop 生態圈里的 Flume。通常在其配置文件規定 Logstash 如何處理各種類型的事件流,一般包含 input、filter、output 三個部分。Logstash 為各個部分提供相應的插件,因而有 input、filter、output 三類插件完成各種處理和轉換;另外 codec 類的插件可以放在 input 和 output 部分通過簡單編碼來簡化處理過程。下面以 DB2 的一條日志為例。
圖 3.DB2 數據庫產生的半結構化日志樣例
這是一種多行的日志,每一條日志以:“2014-10-19-12.19.46.033037-300”格式的時間戳為起始標志。可以在 input 部分引入 codec 插件 multiline,來將一條日志的多行文本封裝到一條消息(message)中。
| 1 2 3 4 5 6 7 8 9 10 | input { ?file { ?path => "path/to/filename" ?codec => multiline { ?pattern => "^\d{4}-\d{2}-\d{2}-\d{2}\.\d{2}\.\d{2}\.\d{6}[\+-]\d{3}" ?negate => true ?what => previous ?} ?} } |
使用 file 插件導入文件形式的日志,而嵌入的 codec 插件 multiline 的參數 pattern 就規定了分割日志條目的時間戳格式。在 DataStage 多行日志的實際應用中,有時一條日志會超過 500 行,這超出了 multiline 組件默認的事件封裝的最大行數,這需要我們在 multiline 中設置 max_lines 屬性。
經過 input 部分讀入預處理后的數據流入 filter 部分,其使用 grok、mutate 等插件來過濾文本和匹配字段,并且我們自己可以為事件流添加額外的字段信息:
| 1 2 3 4 5 6 7 8 9 10 11 12 13 | filter { ?mutate{ ?gsub => ['message', "\n", " "] ?} ?grok { ?match => { "message" => ?"(?<timestamp>%{YEAR}-%{MONTHNUM}-%{MONTHDAY}-%{HOUR}\.%{MINUTE}\.%{SECOND})%{INT:timezone}(?:%{SPACE}%{WORD:recordid}%{SPACE})(?:LEVEL%{SPACE}:%{SPACE}%{DATA:level}%{SPACE})(?:PID%{SPACE}:%{SPACE}%{INT:processid}%{SPACE})(?:TID%{SPACE}:%{SPACE}%{INT:threadid}%{SPACE})(?:PROC%{SPACE}:%{SPACE}%{DATA:process}%{SPACE})?(?:INSTANCE%{SPACE}:%{SPACE}%{WORD:instance}%{SPACE})?(?:NODE%{SPACE}:%{SPACE}%{WORD:node}%{SPACE})?(?:DB%{SPACE}:%{SPACE}%{WORD:dbname}%{SPACE})?(?:APPHDL%{SPACE}:%{SPACE}%{NOTSPACE:apphdl}%{SPACE})?(?:APPID%{SPACE}:%{SPACE}%{NOTSPACE:appid}%{SPACE})?(?:AUTHID%{SPACE}:%{SPACE}%{WORD:authid}%{SPACE})?(?:HOSTNAME%{SPACE}:%{SPACE}%{HOSTNAME:hostname}%{SPACE})?(?:EDUID%{SPACE}:%{SPACE}%{INT:eduid}%{SPACE})?(?:EDUNAME%{SPACE}:%{SPACE}%{DATA:eduname}%{SPACE})?(?:FUNCTION%{SPACE}:%{SPACE}%{DATA:function}%{SPACE})(?:probe:%{SPACE}%{INT:probe}%{SPACE})%{GREEDYDATA:functionlog}" ?} ?} ?date { ?match => [ "timestamp", "YYYY-MM-dd-HH.mm.ss.SSSSSS" ] ?} ?} |
前面 input 部分的 multiline 插件將一條多行日志項轉化為一行,并以“\n”替代實際的換行符。為了便于后面處理,這里的 mutate 插件就是將這些“\n”替換為空格。而 grok 插件用于匹配提取日志項中有意義的字段信息。最后的 date 插件則是按格式“YYYY-MM-dd-HH.mm.ss.SSSSSS”解析提取的時間戳字段,并賦給系統默認的時間戳字段“@timestamp”。Output 插件用于指定事件流的去向,可以是消息隊列、全文搜索引擎、TCP Socket、Email 等幾十種目標端。
2. Elasticsearch
Elasticsearch 是基于 Lucene 的近實時搜索平臺,它能在一秒內返回你要查找的且已經在 Elasticsearch 做了索引的文檔。它默認基于 Gossip 路由算法的自動發現機制構建配置有相同 cluster name 的集群,但是有的時候這種機制并不可靠,會發生腦裂現象。鑒于主動發現機制的不穩定性,用戶可以選擇在每一個節點上配置集群其他節點的主機名,在啟動集群時進行被動發現。
Elasticsearch 中的 Index 是一組具有相似特征的文檔集合,類似于關系數據庫模型中的數據庫實例,Index 中可以指定 Type 區分不同的文檔,類似于數據庫實例中的關系表,Document 是存儲的基本單位,都是 JSON 格式,類似于關系表中行級對象。我們處理后的 JSON 文檔格式的日志都要在 Elasticsearch 中做索引,相應的 Logstash 有 Elasticsearch output 插件,對于用戶是透明的。
Hadoop 生態圈為大規模數據集的處理提供多種分析功能,但實時搜索一直是 Hadoop 的軟肋。如今,Elasticsearch for Apache Hadoop(ES-Hadoop)彌補了這一缺陷,為用戶整合了 Hadoop 的大數據分析能力以及 Elasticsearch 的實時搜索能力.
圖 4. 應用 es-hadoop 整合 Hadoop Ecosystem 與 Elasticsearch 架構圖(https://www.elastic.co/products/hadoop)
3. Kibana
Kibana 是專門設計用來與 Elasticsearch 協作的,可以自定義多種表格、柱狀圖、餅狀圖、折線圖對存儲在 Elasticsearch 中的數據進行深入挖掘分析與可視化。下圖定制的儀表盤可以動態監測數據庫集群中每個數據庫實例產生的各種級別的日志。
圖 5. 實時監測 DB2 實例運行狀態的動態儀表盤
Kafka
Kafka 是 LinkedIn 開源的分布式消息隊列,它采用了獨特的消費者-生產者架構實現數據平臺各組件間的數據共享。集群概念中的 server 在 Kafka 中稱之為 broker,它使用主題管理不同類別的數據,比如 DB2 日志歸為一個主題,tomcat 日志歸為一個主題。我們使用 Logstash 作為 Kafka 消息的生產者時,output 插件就需要配置好 Kafka broker 的列表,也就是 Kafka 集群主機的列表;相應的,用作 Kafka 消費者角色的 Logstash 的 input 插件就要配置好需要訂閱的 Kafka 中的主題名稱和 ZooKeeper 主機列表。Kafka 通過將數據持久化到硬盤的 Write Ahead Log(WAL)保證數據可靠性與順序性,但這并不會影響實時數據的傳輸速度,實時數據仍是通過內存傳輸的。Kafka 是依賴于 ZooKeeper 的,它將每組消費者消費的相應 topic 的偏移量保存在 ZooKeeper 中。據稱 LinkedIn 內部的 Kafka 集群每天已能處理超過 1 萬億條消息。
圖 6. 基于消息訂閱機制的 Kafka 架構
除了可靠性和獨特的 push&pull 架構外,相較于其他消息隊列,Kafka 還擁有更大的吞吐量:
圖 7. 基于消息持久化機制的消息隊列吞吐量比較
Spark Streaming
Spark 由加州大學伯克利分校 AMP 實驗室 (Algorithms, Machines, and People Lab) 開發,可用來構建大型的、低延遲的數據分析應用程序。它將批處理、流處理、即席查詢融為一體。Spark 社區也是相當火爆,平均每三個月迭代一次版本更是體現了它在大數據處理領域的地位。
Spark Streaming 不同于 Storm,Storm 是基于事件級別的流處理,Spark Streaming 是 mini-batch 形式的近似流處理的微型批處理。Spark Streaming 提供了兩種從 Kafka 中獲取消息的方式:
第一種是利用 Kafka 消費者高級 API 在 Spark 的工作節點上創建消費者線程,訂閱 Kafka 中的消息,數據會傳輸到 Spark 工作節點的執行器中,但是默認配置下這種方法在 Spark Job 出錯時會導致數據丟失,如果要保證數據可靠性,需要在 Spark Streaming 中開啟Write Ahead Logs(WAL),也就是上文提到的 Kafka 用來保證數據可靠性和一致性的數據保存方式。可以選擇讓 Spark 程序把 WAL 保存在分布式文件系統(比如 HDFS)中。
第二種方式不需要建立消費者線程,使用 createDirectStream 接口直接去讀取 Kafka 的 WAL,將 Kafka 分區與 RDD 分區做一對一映射,相較于第一種方法,不需再維護一份 WAL 數據,提高了性能。讀取數據的偏移量由 Spark Streaming 程序通過檢查點機制自身處理,避免在程序出錯的情況下重現第一種方法重復讀取數據的情況,消除了 Spark Streaming 與 ZooKeeper/Kafka 數據不一致的風險。保證每條消息只會被 Spark Streaming 處理一次。以下代碼片通過第二種方式讀取 Kafka 中的數據:
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 | // Create direct kafka stream with brokers and topics JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream( ?jssc, ?String.class, ?String.class, ?StringDecoder.class, ?StringDecoder.class, ?kafkaParams, ?topicsSet); messages.foreachRDD(new Function<JavaPairRDD<String,String>,Void>(){ ????public Void call(JavaPairRDD<String, String> v1) ?throws Exception { ????????v1.foreach(new VoidFunction<Tuple2<String, String>>(){ ?public void call(Tuple2<String, String> tuple2) { ?try{ ?JSONObject a = new JSONObject(tuple2._2); ?... |
Spark Streaming 獲取到消息后便可以通過 Tuple 對象自定義操作消息,如下圖是針對 DB2 數據庫日志的郵件告警,生成告警郵件發送到 Notes 郵箱:
圖 8. 基于 Spark Streaming 對 DB2 異常日志實現 Notes 郵件告警
互聯網行業日志處理方案舉例介紹與應用
1. 新浪
新浪采用的技術架構是常見的 Kafka 整合 ELK Stack 方案。Kafka 作為消息隊列用來緩存用戶日志;使用 Logstash 做日志解析,統一成 JSON 格式輸出給 Elasticsearch;使用 Elasticsearch 提供實時日志分析與強大的搜索和統計服務;Kibana 用作數據可視化組件。該技術架構目前服務的用戶包括微博、微盤、云存儲、彈性計算平臺等十多個部門的多個產品的日志搜索分析業務,每天處理約 32 億條(2TB)日志。
新浪的日志處理平臺團隊對 Elasticsearch 做了大量優化(比如調整 max open files 等),并且開發了一個獨立的 Elasticsearch Index 管理系統,負責索引日常維護任務(比如索引的創建、優化、刪除、與分布式文件系統的數據交換等)的調度及執行。為 Elasticsearch 安裝了國內中文分詞插件 elasticsearch-analysis-ik,滿足微盤搜索對中文分詞的需求。(見參考資源 2)
2. 騰訊
騰訊藍鯨數據平臺告警系統的技術架構同樣基于分布式消息隊列和全文搜索引擎。但騰訊的告警平臺不僅限于此,它的復雜的指標數據統計任務通過使用 Storm 自定義流式計算任務的方法實現,異常檢測的實現利用了曲線的時間周期性和相關曲線之間的相關性去定義動態的閾值,并且基于機器學習算法實現了復雜的日志自動分類(比如 summo logic)。
告警平臺把撥測(定時 curl 一下某個 url,有問題就告警)、日志集中檢索、日志告警(5 分鐘 Error 大于 X 次告警)、指標告警(cpu 使用率大于 X 告警)整合進同一個數據管線,簡化了整體的架構。(見參考資源 3)
3. 七牛
七牛采用的技術架構為 Flume+Kafka+Spark,混部在 8 臺高配機器。根據七牛技術博客提供的數據,該日志處理平臺每天處理 500 億條數據,峰值 80 萬 TPS。?
Flume 相較于 Logstash 有更大的吞吐量,而且與 HDFS 整合的性能比 Logstash 強很多。七牛技術架構選型顯然考慮了這一點,七牛云平臺的日志數據到 Kafka 后,一路同步到 HDFS,用于離線統計,另一路用于使用 Spark Streaming 進行實時計算,計算結果保存在 Mongodb 集群中。(見參考資源 4)
任何解決方案都不是十全十美的,具體采用哪些技術要深入了解自己的應用場景。就目前日志處理領域的開源組件來說,在以下幾個方面還比較欠缺:
- Logstash 的內部狀態獲取不到,目前沒有好的成熟的監控方案。
- Elasticsearch 具有海量存儲海量聚合的能力,但是同 Mongodb 一樣,并不適合于寫入數據非常多(1 萬 TPS 以上)的場景。
- 缺乏真正實用的異常檢測方法;實時統計方面缺乏成熟的解決方案,Storm 就是一個底層的執行引擎,而 Spark 還缺少時間窗口等抽象。
- 對于日志自動分類,還沒有開源工具可以做到 summo logic 那樣的效果。
結束語
大數據時代的運維管理意義重大,好的日志處理平臺可以事半功倍的提升開發人員和運維人員的效率。本文通過簡單用例介紹了 ELK Stack、Kafka 和 Spark Streaming 在日志處理平臺中各自在系統架構中的功能。現實中應用場景繁多復雜、數據形式多種多樣,日志處理工作不是一蹴而就的,分析處理過程還需要在實踐中不斷挖掘和優化,筆者也將致力于 DB2 數據庫運行狀態更細節數據的收集和更全面細致的監控。
相關主題
- 1. 參考“日志: 每個軟件工程師都應該知曉的實時數據集成提取的那點事”,詳細了解日志處理。
- 2. 查看文章“新浪是如何分析處理 32 億條實時日志的?”,了解新浪的日志處理方案設計。
- 3. 查看文章“騰訊藍鯨數據平臺之告警系統”,了解騰訊在大型系統運維工程上的實踐。
- 4. 查看文章“七牛是如何搞定每天 500 億條日志的”,了解七牛海量日志處理解決方案。
- 5. 查看文章“Mesos 在去哪兒網的應用”,了解去哪兒網利用日志在集群管理中的實踐。
- 6. 參考網站“https://softwaremill.com/mqperf/”,了解基于消息持久化的各消息隊列各種負載下的性能分析。
- 在 developerWorks 大數據和分析專區,了解關于大數據的更多信息,獲取技術文檔、how-to 文章、培訓、下載、產品信息以及其他資源。
https://www.ibm.com/developerworks/cn/analytics/library/ba-1512-elkstack-logprocessing/index.html
總結
以上是生活随笔為你收集整理的基于 ELK Stack 和 Spark Streaming 的日志处理平台设计与实现的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 【学习笔记】分布式Tensorflow
- 下一篇: 高性能日志框架 Log4a 原理分析