Spark Structure Streaming(一)之简介
一、Structure Streaming
結構化流是基于Spark SQL引擎構建的可伸縮且容錯的流處理引擎。可以像對靜態數據進行批處理計算一樣,來表示流計算。
當流數據繼續到達時,Spark SQL引擎將負責遞增地,連續地運行它并更新最終結果。可以在Scala,Java,Python或R中使用Dataset / DataFrame API來表示流聚合,事件時間窗口,流到批處理聯接等。計算是在同一優化的Spark SQL引擎上執行的。最后,該系統通過檢查點和預寫日志來確保端到端的一次容錯保證。簡而言之,結構化流提供了快速,可擴展,容錯,端到端的精確一次流處理,而用戶無需推理流。
在內部,默認情況下,結構化流查詢是使用微批量處理引擎處理的,該引擎將數據流作為一系列小批量作業處理,從而實現了低至100毫秒的端到端延遲以及一次精確的容錯保證。但是,從Spark 2.3開始,我們引入了一種稱為“連續處理”的新低延遲處理模式,該模式可以實現一次最少保證的低至1毫秒的端到端延遲。在不更改查詢中的Dataset / DataFrame操作的情況下,您將能夠根據應用程序需求選擇模式。
?
二、Structure Streaming與Spark Streaming區別
(1)流模型
- Spark Streaming
?Spark Streaming采用微批的處理方法。每一個批處理間隔的為一個批,也就是一個RDD,對RDD進行操作就可以源源不斷的接收、處理數據。
- Structured Streaming
?Structured Streaming將實時數據當做被連續追加的表。流上的每一條數據都類似于將一行新數據添加到表中。
“輸出”定義為寫到外部存儲器的內容。可以在不同的模式下定義輸出:
-
完整模式-整個更新后的結果表將被寫入外部存儲器。由存儲連接器決定如何處理整個表的寫入。
-
追加模式-僅將自上次觸發以來追加在結果表中的新行寫入外部存儲器。這僅適用于預期結果表中現有行不會更改的查詢。
-
更新模式-僅自上次觸發以來在結果表中已更新的行將被寫入外部存儲(自Spark 2.1.1起可用)。請注意,這與完成模式的不同之處在于此模式僅輸出自上次觸發以來已更改的行。如果查詢不包含聚合,則等效于追加模式。
注意:每種模式都適用于某些類型的查詢。
(2)數據集API
- Spark Streaming中的DStream編程接口是RDD
- Structured Streaming使用DataFrame和Dataset
(3)處理事件時間和延遲數據
Process Time:流處理引擎接收到數據的時間,Event Time:數據產生的時間
- Spark Streaming
Spark Streaming中由于其微批的概念,會將一段時間內接收的數據放入一個批內,進而對數據進行處理。劃分批的時間是Process Time,而不是Event Time,Spark Streaming沒有提供對Event Time的支持。
- Structured Streaming
Structured Streaming提供了基于事件時間處理數據的功能,如果數據包含事件的時間戳,就可以基于事件時間進行處理。
事件時間是嵌入數據本身的時間。對于許多應用程序,您可能希望在此事件時間進行操作。
例如,如果要獲取每分鐘由IoT設備生成的事件數,則可能要使用生成數據的時間(即數據中的事件時間),而不是Spark收到的時間。他們。此事件時間在此模型中非常自然地表達-設備中的每個事件都是表中的一行,而事件時間是該行中的列值。這允許基于窗口的聚合(例如,每分鐘的事件數)只是事件時間列上的一種特殊類型的分組和聚合-每個時間窗口都是一個組,每行可以屬于多個窗口/組。
由于Spark正在更新結果表,因此它具有完全控制權,可以在有較晚數據時更新舊聚合,并可以清除舊聚合以限制中間狀態數據的大小。
(3)容錯性
兩者在容錯性都使用了checkpoint機制。
checkpoint通過設置檢查點,將數據保存到文件系統,在出現出故障的時候進行數據恢復。
- Spark Streaming
在spark streaming中,如果程序的代碼修改重新提交任務時,是不能從checkpoint中恢復數據,需要刪除checkpoint目錄。
- Structured Streaming
在structured streaming中,對于指定的代碼修改操作,不影響修改后從checkpoint中恢復數據。
(4)Output Sinks
- Spark Streaming
Spark Streaming只提供Foreach sink
- Structured Streaming
Structured Streaming提供File sink、Kafka sink、Foreach sink、Console sink、Memory sink
?
三、Structured Streaming例子
偵聽TCP套接字的數據服務器接收到的文本數據的字數
SparkSession sparkSession = SparkSession.builder().appName("structuredStreaming").getOrCreate();//創建輸入數據源 Dataset<Row> lines = spark.readStream().format("socket").option("host", "localhost").option("port", 8100).load();//轉換格式 Dataset<String> words = lines.as(Encoders.STRING()).flatMap((FlatMapFunction<String, String>) x -> Arrays.asList(x.split(" ")).iterator(), Encoders.STRING());//統計 Dataset<Row> wordCounts = words.groupBy("value").count();//創建輸出流 StreamingQuery query = wordCounts.writeStream().outputMode("complete").format("console").start();//等待結束 query.awaitTermination();?
總結
以上是生活随笔為你收集整理的Spark Structure Streaming(一)之简介的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 路由器怎么设置能最快路由器如何设置网速最
- 下一篇: MacBook怎么连接移动硬盘MacBo