日志服务(SLS)集成 Spark 流计算实战
前言
日志服務作為一站式的日志的采集與分析平臺,提供了各種用戶場景的日志采集能力,通過日志服務提供的各種與·與SDK,采集客戶端(Logtail),Producer,用戶可以非常容易的把各種數據源中的數據采集到日志服務的Logstore中。同時為了便于用戶對日志進行處理,提供了各種支持流式消費的SDK,如各種語言的消費組,與 Spark,Flink,Storm 等各種流計算技術無縫對接的Connector,以便于用戶根據自己的業務場景非常便捷的處理海量日志。
從最早的Spark Streaming到最新的Stuctured Streaming,Spark 一直是最流行的流計算框架之一。使用日志服務的Spark SDK,可以非常方便的在Spark 中消費日志服務中的數據,同時也支持將 Spark 的計算結果寫入日志服務。
日志服務基礎概念
日志服務的存儲層是一個類似Kafka的Append only的FIFO消息隊列,包含如下基本概念:
- 日志(Log):由時間、及一組不定個數的Key-Value對組成。
- 日志組(LogGroup):一組日志的集合,包含相同Meta信息如Topic,Source,Tags等。是讀寫的基本單位。
圖-1 Log與LogGroup的關系
- Shard:分區,LogGroup讀寫基本單元,對應于Kafka的partition。
- Logstore:日志庫,用以存放同一類日志數據。Logstore會包含1個或多個Shard。
- Project:Logstore存放容器,包含一個或者多個Logstore。
準備工作
1)添加Maven依賴:
<dependency><groupId>com.aliyun.emr</groupId><artifactId>emr-logservice_2.11</artifactId><version>1.9.0</version> </dependency>Github源碼下載。
2)計劃消費的日志服務project,logstore以及對應的endpoint。
3)用于訪問日志服務Open API的Access Key。
對 Spark Streaming 的支持
Spark Streaming是Spark最早推出的流計算技術,現在已經進入維護狀態,不再會增加新的功能。但是考慮到Spark Streaming 的使用仍然非常廣泛,我們先從Spark Streaming開始介紹。Spark Streaming 提供了一個DStream 的數據模型抽象,本質是把無界數據集拆分成一個一個的RDD,轉化為有界數據集的流式計算。每個批次處理的數據就是這段時間內從日志服務消費到的數據。
?
圖-2 DStream
Spark Streaming 從日志服務消費支持?Receiver 和?Direct?兩種消費方式。
Receiver模式
Receivers的實現內部實現基于日志服務的消費組(Consumer Library)。數據拉取與處理完全分離。消費組自動均勻分配Logstore內的所有shard到所有的Receiver,并且自動提交checkpoint到SLS。這就意味著Logstore內的shard個數與Spark 實際的并發沒有對應關系。
對于所有的Receiver,接收到的數據默認會保存在Spark Executors中,所以Failover的時候有可能造成數據丟失,這個時候就需要開啟WAL日志,Failover的時候可以從WAL中恢復,防止丟失數據。
SDK將SLS中的每行日志解析為JSON字符串形式,Receiver使用示例如下所示:
object SLSReceiverSample {def main(args: Array[String]): Unit = {val project = "your project"val logstore = "your logstore"val consumerGroup = "consumer group"val endpoint = "your endpoint"val accessKeyId = "access key id"val accessKeySecret = "access key secret"val batchInterval = Milliseconds(5 * 1000)val conf = new SparkConf().setAppName("Test SLS Loghub")val ssc = new StreamingContext(conf, batchInterval)val stream = LoghubUtils.createStream(ssc,project,logstore,consumerGroup,endpoint,accessKeyId,accessKeySecret,StorageLevel.MEMORY_AND_DISK,LogHubCursorPosition.END_CURSOR)stream.checkpoint(batchInterval * 2).foreachRDD(rdd =>rdd.map(bytes => new String(bytes)).top(10).foreach(println))ssc.checkpoint("hdfs:///tmp/spark/streaming")ssc.start()ssc.awaitTermination()} }除Project,Logstore,Access Key 這些基礎配置外,還可以指定StorageLevel,消費開始位置等。
Direct模式
Direct模式不再需要Receiver,也不依賴于消費組,而是使用日志服務的低級API,在每個批次內直接從服務端拉取數據處理。對于Logstore中的每個Shard來說,每個批次都會讀取指定位置范圍內的數據。為了保證一致性,只有在每個批次確認正常結束之后才能把每個Shard的消費結束位置(checkpoint)保存到服務端。
為了實現Direct模式,SDK依賴一個本地的ZooKeeper,每個shard的checkpoint會臨時保存到本地的ZooKeeper,等用戶手動提交checkpoint時,再從ZooKeeper中同步到服務端。Failover時也是先從本地ZooKeeper中嘗試讀上一次的checkpoint,如果沒有讀到再從服務端獲取。
object SLSDirectSample {def main(args: Array[String]): Unit = {val project = "your project"val logstore = "your logstore"val consumerGroup = "consumerGroup"val endpoint = "endpoint"val accessKeyId = "access key id"val accessKeySecret = "access key secret"val batchInterval = Milliseconds(5 * 1000)val zkAddress = "localhost:2181"val conf = new SparkConf().setAppName("Test Direct SLS Loghub")val ssc = new StreamingContext(conf, batchInterval)val zkParas = Map("zookeeper.connect" -> zkAddress)val loghubStream = LoghubUtils.createDirectStream(ssc,project,logstore,consumerGroup,accessKeyId,accessKeySecret,endpoint,zkParas,LogHubCursorPosition.END_CURSOR)loghubStream.checkpoint(batchInterval).foreachRDD(rdd => {println(s"count by key: ${rdd.map(s => {s.sorted(s.length, s)}).countByKey().size}")// 手動更新checkpointloghubStream.asInstanceOf[CanCommitOffsets].commitAsync()})ssc.checkpoint("hdfs:///tmp/spark/streaming") // set checkpoint directoryssc.start()ssc.awaitTermination()} }Direct模式示例
如何限速
在Receiver中,如果需要限制消費速度,我們只需要調整 Consumer Library 本身的參數即可。而Direct方式是在每個批次開始時從SLS拉取數據,這就涉及到一個問題:一個批次內拉取多少數據才合適。如果太多,一個批次內處理不完,造成處理延時。如果太少會導worker空閑,工作不飽和,消費延時。這個時候我們就需要合理配置拉取的速度和行數,實現一個批次盡可能多處理又能及時完成的目標。理想狀態下Spark 消費的整體速率應該與SLS采集速率一致,才能實現真正的實時處理。
由于SLS的數據模型是以LogGroup作為讀寫的基本單位,而一個LogGroup中可能包含上萬行日志,這就意味著Spark中直接限制每個批次的行數難以實現。因此,Direct限流涉及到兩個配置參數:
| spark.streaming.loghub.maxRatePerShard | 每個批次每個Shard讀取行數,決定了限流的下限 | 10000 |
| spark.loghub.batchGet.step | 每次請求讀取LogGroup個數,決定了限流的粒度 | 100 |
可以通過適當縮小spark.loghub.batchGet.step來控制限流的精度,但是即便如此,在某些情況下還是會存在較大誤差,如一個LogGroup中存在10000行日志,spark.streaming.loghub.maxRatePerShard設置為100,spark.loghub.batchGet.step設置為1,那一個批次內該shard還是會拉取10000行日志。
兩種模式的對比
和Receiver相比,Direct有如下的優勢:
但是也存在一些缺點:
Spark Streaming結果寫入SLS
與消費SLS相反,Spark Streaming的處理結果也可以直接寫入SLS。使用示例:
...val lines = loghubStream.map(x => x)// 轉換函數把結果中每條記錄轉為一行日志def transformFunc(x: String): LogItem = {val r = new LogItem()r.PushBack("key", x)r}val callback = new Callback with Serializable {override def onCompletion(result: Result): Unit = {println(s"Send result ${result.isSuccessful}")}}// SLS producer configval producerConfig = Map("sls.project" -> loghubProject,"sls.logstore" -> targetLogstore,"access.key.id" -> accessKeyId,"access.key.secret" -> accessKeySecret,"sls.endpoint" -> endpoint,"sls.ioThreadCount" -> "2")lines.writeToLoghub(producerConfig,"topic","streaming",transformFunc, Option.apply(callback))ssc.checkpoint("hdfs:///tmp/spark/streaming") // set checkpoint directoryssc.start()ssc.awaitTermination()對Structured Streaming的支持
Structured? Streaming 并不是最近才出現的技術,而是早在16年就已經出現,但是直到 Spark 2.2.0 才正式推出。其數據模型是基于無界表的概念,流數據相當于往一個表上不斷追加行。
圖-3 無界表模型
與Spark Streaming相比,Structured Streaming主要有如下特點:
SDK使用示例
import org.apache.spark.sql.SparkSession import org.apache.spark.sql.types.{StringType, StructField, StructType}object StructuredStreamingDemo {def main(args: Array[String]) {val spark = SparkSession.builder.appName("StructuredLoghubWordCount").master("local").getOrCreate()import spark.implicits._val schema = new StructType(Array(StructField("content", StringType)))val lines = spark.readStream.format("loghub").schema(schema).option("sls.project", "your project").option("sls.store", "your logstore").option("access.key.id", "your access key id").option("access.key.secret", "your access key secret").option("endpoint", "your endpoint").option("startingoffsets", "latest").load().select("content").as[String]val wordCounts = lines.flatMap(_.split(" ")).groupBy("value").count()val query = wordCounts.writeStream.outputMode("complete").format("loghub").option("sls.project", "sink project").option("sls.store", "sink logstore").option("access.key.id", "your access key id").option("access.key.secret", "your access key secret").option("endpoint", "your endpoint").option("checkpointLocation", "your checkpoint dir").start()query.awaitTermination()} }代碼解釋:
1)schema 聲明了我們需要的字段,除了日志中的字段外,還有如下的內部字段:
如果沒有指定schema,SDK默認提供一個__value__字段,其內容為由所有字段組成的一個JSON字符串。
2)lines 定義了一個流。
startingoffsets:開始位置,支持:
- latest :日志服務最新寫入位置。強烈建議從latest開始,從其他位置開始意味著需要先處理歷史數據,可能需要等待較長時間才能結束。
- earliest:日志服務中最早的日志對應的位置。
- 或者為每個shard指定一個開始時間,以JSON形式指定。
maxOffsetsPerTrigger:批次讀取行數,SDK中默認是64*1024 。
3)結果寫入到日志服務
format 指定為Loghub即可。
不足之處
原文鏈接
本文為阿里云原創內容,未經允許不得轉載。
總結
以上是生活随笔為你收集整理的日志服务(SLS)集成 Spark 流计算实战的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 投入20亿,赋能1万家,阿里云正式启动云
- 下一篇: 阿里高级技术专家总结6年来的成长和收获