2021年大数据Spark(四十四):Structured Streaming概述
?
Apache Spark在2016年的時候啟動了Structured Streaming項目,一個基于Spark SQL的全新流計算引擎Structured Streaming,讓用戶像編寫批處理程序一樣簡單地編寫高性能的流處理程序。
Structured Streaming并不是對Spark Streaming的簡單改進,而是吸取了在開發(fā)Spark SQL和Spark Streaming過程中的經(jīng)驗教訓,以及Spark社區(qū)和Databricks眾多客戶的反饋,重新開發(fā)的全新流式引擎,致力于為批處理和流處理提供統(tǒng)一的高性能API。同時,在這個新的引擎中,也很容易實現(xiàn)之前在Spark Streaming中很難實現(xiàn)的一些功能,比如Event Time(事件時間)的支持,Stream-Stream Join(2.3.0 新增的功能),毫秒級延遲(2.3.0 即將加入的 Continuous Processing)。
?
Structured Streaming概述
Spark Streaming是Apache?Spark早期基于RDD開發(fā)的流式系統(tǒng),用戶使用DStream API來編寫代碼,支持高吞吐和良好的容錯。其背后的主要模型是Micro Batch(微批處理),也就是將數(shù)據(jù)流切成等時間間隔(BatchInterval)的小批量任務來執(zhí)行。
Structured Streaming則是在Spark 2.0加入的,經(jīng)過重新設計的全新流式引擎。它的模型十分簡潔,易于理解。一個流的數(shù)據(jù)源從邏輯上來說就是一個不斷增長的動態(tài)表格,隨著時間的推移,新數(shù)據(jù)被持續(xù)不斷地添加到表格的末尾,用戶可以使用Dataset/DataFrame 或者 SQL 來對這個動態(tài)數(shù)據(jù)源進行實時查詢。
文檔:http://spark.apache.org/docs/2.4.5/structured-streaming-programming-guide.html
?
Spark Streaming 不足
Spark Streaming 會接收實時數(shù)據(jù)源的數(shù)據(jù),并切分成很多小的batches,然后被Spark Engine執(zhí)行,產(chǎn)出同樣由很多小的batchs組成的結果流。
?
本質上,這是一種micro-batch(微批處理)的方式處理,用批的思想去處理流數(shù)據(jù)。這種設計讓Spark Streaming面對復雜的流式處理場景時捉襟見肘。
?
Spark Streaming 存在哪些不足,總結一下主要有下面幾點:
?1:使用 Processing Time 而不是 Event Time
Processing Time 是數(shù)據(jù)到達 Spark 被處理的時間,而 Event Time 是數(shù)據(jù)自帶的屬性,一般表示數(shù)據(jù)產(chǎn)生于數(shù)據(jù)源的時間。
比如 IoT 中,傳感器在 12:00:00 產(chǎn)生一條數(shù)據(jù),然后在 12:00:05 數(shù)據(jù)傳送到 Spark,那么 Event Time 就是 12:00:00,而 Processing Time 就是 12:00:05。
Spark Streaming是基于DStream模型的micro-batch模式,簡單來說就是將一個微小時間段(比如說 1s)的流數(shù)據(jù)當前批數(shù)據(jù)來處理。如果要統(tǒng)計某個時間段的一些數(shù)據(jù)統(tǒng)計,毫無疑問應該使用 Event Time,但是因為 Spark Streaming 的數(shù)據(jù)切割是基于Processing Time,這樣就導致使用 Event Time 特別的困難。
?2:Complex, low-level api
DStream(Spark Streaming 的數(shù)據(jù)模型)提供的API類似RDD的API,非常的low level;
當編寫Spark Streaming程序的時候,本質上就是要去構造RDD的DAG執(zhí)行圖,然后通過Spark Engine運行。這樣導致一個問題是,DAG 可能會因為開發(fā)者的水平參差不齊而導致執(zhí)行效率上的天壤之別;
?3:reason about end-to-end application
end-to-end指的是直接input到out,如Kafka接入Spark Streaming然后再導出到HDFS中;
DStream 只能保證自己的一致性語義是 exactly-once 的,而 input 接入 Spark Streaming 和 Spark Straming 輸出到外部存儲的語義往往需要用戶自己來保證;
?4:批流代碼不統(tǒng)一
盡管批流本是兩套系統(tǒng),但是這兩套系統(tǒng)統(tǒng)一起來確實很有必要,有時候確實需要將的流處理邏輯運行到批數(shù)據(jù)上面;
Streaming盡管是對RDD的封裝,但是要將DStream代碼完全轉換成RDD還是有一點工作量的,更何況現(xiàn)在Spark的批處理都用DataSet/DataFrameAPI;
?
總結
流式計算一直沒有一套標準化、能應對各種場景的模型,直到2015年Google發(fā)表了The Dataflow Model的論文( https://yq.aliyun.com/articles/73255?)
Google開源Apache Beam項目,基本上就是對Dataflow模型的實現(xiàn),目前已經(jīng)成為Apache的頂級項目,但是在國內使用不多。
國內使用的更多的是Apache Flink,因為阿里大力推廣Flink,甚至把花7億元把Flink母公司收購。
?
使用Yahoo的流基準平臺,要求系統(tǒng)讀取廣告點擊事件,并按照活動ID加入到一個廣告活動的靜態(tài)表中,并在10秒的event-time窗口中輸出活動計數(shù)。
比較了Kafka Streams 0.10.2、Apache Flink 1.2.1和Spark 2.3.0,在一個擁有5個c3.2*2大型Amazon EC2 工作節(jié)點和一個master節(jié)點的集群上(硬件條件為8個虛擬核心和15GB的內存)。
?
上圖(a)展示了每個系統(tǒng)最大穩(wěn)定吞吐量(積壓前的吞吐量),Flink可以達到3300萬,而Structured Streaming可以達到6500萬,近乎兩倍于Flink。這個性能完全來自于Spark SQL的內置執(zhí)行優(yōu)化,包括將數(shù)據(jù)存儲在緊湊的二進制文件格式以及代碼生成。
附錄:【Streaming System系統(tǒng)】設計文章:
?Streaming System 第一章【Streaming 101】
網(wǎng)址:https://blog.csdn.net/xxscj/article/details/84990301
?Streaming System 第二章【The What- Where- When- and How of Data Processing】
網(wǎng)址:https://blog.csdn.net/xxscj/article/details/84989879
?
Structured Streaming 介紹
或許是對Dataflow模型的借鑒,也許是英雄所見略同,Spark在2.0版本中發(fā)布了新的流計算的API:Structured Streaming結構化流。Structured Streaming是一個基于Spark SQL引擎的可擴展、容錯的流處理引擎。
Structured Streaming統(tǒng)一了流、批的編程模型,可以使用靜態(tài)數(shù)據(jù)批處理一樣的方式來編寫流式計算操作,并且支持基于event_time的時間窗口的處理邏輯。隨著數(shù)據(jù)不斷地到達,Spark 引擎會以一種增量的方式來執(zhí)行這些操作,并且持續(xù)更新結算結果。
?
模塊介紹
Structured Streaming 在 Spark 2.0 版本于 2016 年引入,設計思想?yún)⒖己芏嗥渌到y(tǒng)的思想,比如區(qū)分 processing time 和 event time,使用 relational 執(zhí)行引擎提高性能等。同時也考慮了和 Spark 其他組件更好的集成。
?
Structured Streaming 和其他系統(tǒng)的顯著區(qū)別主要如下:
?1:Incremental query model(增量查詢模型)
Structured Streaming 將會在新增的流式數(shù)據(jù)上不斷執(zhí)行增量查詢,同時代碼的寫法和批處理 API(基于Dataframe和Dataset API)完全一樣,而且這些API非常的簡單。
?2:Support for end-to-end application(支持端到端應用)
Structured Streaming 和內置的 connector 使的 end-to-end 程序寫起來非常的簡單,而且 "correct by default"。數(shù)據(jù)源和sink滿足 "exactly-once" 語義,這樣我們就可以在此基礎上更好地和外部系統(tǒng)集成。
?3:復用 Spark SQL 執(zhí)行引擎
Spark SQL 執(zhí)行引擎做了非常多的優(yōu)化工作,比如執(zhí)行計劃優(yōu)化、codegen、內存管理等。這也是Structured Streaming取得高性能和高吞吐的一個原因。
?
???????核心設計
2016年,Spark在2.0版本中推出了結構化流處理的模塊Structured Streaming,核心設計如下:
?1:Input and Output(輸入和輸出)
Structured Streaming 內置了很多 connector 來保證 input 數(shù)據(jù)源和 output sink 保證 exactly-once 語義。
實現(xiàn) exactly-once 語義的前提:
Input 數(shù)據(jù)源必須是可以replay的,比如Kafka,這樣節(jié)點crash的時候就可以重新讀取input數(shù)據(jù),常見的數(shù)據(jù)源包括 Amazon Kinesis, Apache Kafka 和文件系統(tǒng)。
Output sink 必須要支持寫入是冪等的,這個很好理解,如果 output 不支持冪等寫入,那么一致性語義就是 at-least-once 了。另外對于某些 sink, Structured Streaming 還提供了原子寫入來保證 exactly-once 語義。
補充:冪等性:在HTTP/1.1中對冪等性的定義:一次和多次請求某一個資源對于資源本身應該具有同樣的結果(網(wǎng)絡超時等問題除外)。也就是說,其任意多次執(zhí)行對資源本身所產(chǎn)生的影響均與一次執(zhí)行的影響相同。冪等性是系統(tǒng)服務對外一種承諾(而不是實現(xiàn)),承諾只要調用接口成功,外部多次調用對系統(tǒng)的影響是一致的。聲明為冪等的服務會認為外部調用失敗是常態(tài),并且失敗之后必然會有重試。
?2:Program API(編程 API)
Structured Streaming 代碼編寫完全復用 Spark SQL 的 batch API,也就是對一個或者多個 stream 或者 table 進行 query。
?
?
?
query 的結果是 result table,可以以多種不同的模式(追加:append, 更新:update, 完全:complete)輸出到外部存儲中。
另外,Structured Streaming 還提供了一些 Streaming 處理特有的 API:Trigger, watermark, stateful operator。
?3:Execution Engine(執(zhí)行引擎)
復用 Spark SQL 的執(zhí)行引擎;
Structured Streaming 默認使用類似 Spark Streaming 的 micro-batch 模式,有很多好處,比如動態(tài)負載均衡、再擴展、錯誤恢復以及 straggler (straggler 指的是哪些執(zhí)行明顯慢于其他 task 的 task)重試;
提供了基于傳統(tǒng)的 long-running operator 的 continuous(持續(xù))?處理模式;
?4:Operational Features(操作特性)
利用 wal 和狀態(tài)State存儲,開發(fā)者可以做到集中形式的 rollback 和錯誤恢復FailOver。
?
???????編程模型
Structured Streaming將流式數(shù)據(jù)當成一個不斷增長的table,然后使用和批處理同一套API,都是基于DataSet/DataFrame的。如下圖所示,通過將流式數(shù)據(jù)理解成一張不斷增長的表,從而就可以像操作批的靜態(tài)數(shù)據(jù)一樣來操作流數(shù)據(jù)了。
?
在這個模型中,主要存在下面幾個組成部分:
1:Input Table(Unbounded Table),流式數(shù)據(jù)的抽象表示,沒有限制邊界的,表的數(shù)據(jù)源源不斷增加;
2:Query(查詢),對 Input Table 的增量式查詢,只要Input Table中有數(shù)據(jù),立即(默認情況)執(zhí)行查詢分析操作,然后進行輸出(類似SparkStreaming中微批處理);
3:Result Table,Query 產(chǎn)生的結果表;
4:Output,Result Table 的輸出,依據(jù)設置的輸出模式OutputMode輸出結果;
?
核心思想
Structured Streaming最核心的思想就是將實時到達的數(shù)據(jù)看作是一個不斷追加的unbound table無界表,到達流的每個數(shù)據(jù)項就像是表中的一個新行被附加到無邊界的表中,用靜態(tài)結構化數(shù)據(jù)的批處理查詢方式進行流計算。
?
以詞頻統(tǒng)計WordCount案例,Structured Streaming實時處理數(shù)據(jù)的示意圖如下,各行含義:
第一行、表示從TCP Socket不斷接收數(shù)據(jù),使用【nc -lk 9999】;
第二行、表示時間軸,每隔1秒進行一次數(shù)據(jù)處理;
第三行、可以看成是“input unbound table",當有新數(shù)據(jù)到達時追加到表中;
第四行、最終的wordCounts是結果表,新數(shù)據(jù)到達后觸發(fā)查詢Query,輸出的結果;
第五行、當有新的數(shù)據(jù)到達時,Spark會執(zhí)行“增量"查詢,并更新結果集;該示例設置為Complete Mode,因此每次都將所有數(shù)據(jù)輸出到控制臺;
?
上圖中數(shù)據(jù)實時處理說明:
第一、在第1秒時,此時到達的數(shù)據(jù)為"cat dog"和"dog dog",因此可以得到第1秒時的結果集cat=1 dog=3,并輸出到控制臺;
第二、當?shù)?秒時,到達的數(shù)據(jù)為"owl cat",此時"unbound table"增加了一行數(shù)據(jù)"owl cat",執(zhí)行word count查詢并更新結果集,可得第2秒時的結果集為cat=2 dog=3 owl=1,并輸出到控制臺;
第三、當?shù)?秒時,到達的數(shù)據(jù)為"dog"和"owl",此時"unbound table"增加兩行數(shù)據(jù)"dog"和"owl",執(zhí)行word count查詢并更新結果集,可得第3秒時的結果集為cat=2 dog=4 owl=2;
?
使用Structured Streaming處理實時數(shù)據(jù)時,會負責將新到達的數(shù)據(jù)與歷史數(shù)據(jù)進行整合,并完成正確的計算操作,同時更新Result Table。
總結
以上是生活随笔為你收集整理的2021年大数据Spark(四十四):Structured Streaming概述的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 2021年大数据Spark(四十三):S
- 下一篇: 2021年大数据Spark(二十九):S