第二部分:Spark进阶篇
第一部分:Spark基礎篇_奔跑者-輝的博客-CSDN博客
第二部分:Spark進階篇_奔跑者-輝的博客-CSDN博客
第三部分:Spark調優篇_奔跑者-輝的博客-CSDN博客
目錄
1 SparkShuffle
1.1 HashShuffle
1.2?SortShuffle
1.2.1 普通模式
1.2.2 bypass機制
2 容錯機制
3 Checkpoint?
3.1 checkpoint 檢查點機制
3.2 checkpoint 與 持久化機制的區別
3.3 checkpoint在spark的2塊應用
3.4 說明SparkStreaming下checkpoint的使用場景
4 廣播變量
5 累加器
6 Kryo序列化
7 Spark中數據的本地化方式分為5種
8 SparkSql執行流程
8.1 RDD 與 SparkSQL 運行時的區別
8.2 explain參看邏輯計劃和物理計劃
8.3 Spark SQL 是如何將數據寫到Hive表的
8.4 SparkSQL中RDD、DataFrame、DataSet三者的轉換
8.4.1?三者共性
8.4.2 三者區別
9 Spark Streaming
9.1 Spark Streaming 基本工作原理
9.2 DStream以及基本工作原理
9.3 Spark Streaming精準一次消費
9.4 SparkStreaming有哪幾種方式消費Kafka中的數據,它們之間的區別是什么
9.5 簡述SparkStreaming窗口函數的原理
9.6 SparkStreaming寫一個WordCount案例
10 Spark中某個task掛掉了,如何知道是哪個task掛掉了
1 SparkShuffle
Spark Shuffle 分為兩種:一種是基于 Hash 的 Shuffle;另一種是基于 Sort 的 Shuffle。
1.1 HashShuffle
未優化的HashShuffle流程:
① 每1個mapTask將不同結果寫到不同buffer中,每個buffer大小為32k,buffer起到數據緩存的作用;
② 每個buffer文件最后對應1個磁盤小文件;
③ reduce task來拉取對應磁盤小文件。
優點:可以省略不必要的排序開銷,避免了排序所需的內存開銷。
缺點:提高性能有限,仍然還會有大量小文件。
優化后的HashShuffle
1.2?SortShuffle
1.2.1 普通模式
普通的sort shuffle:
① maptask的計算結果會寫入到一個內存數據結構里面,內存數據結構默認為5m;
② 在shuffle的時候會有1個定時器,不定期去估算這個內存結構大小;
③ 如果申請成功不會進行溢寫,如果申請不成功,這時候會發生溢寫磁盤;
④ 在溢寫之前內存結構中數據會進行排序分區;
⑤ 然后開始溢寫磁盤,寫磁盤億batch的形式寫,一個batch里是一萬條數據;
⑥ maptask執行完成后,會將這些磁盤小文件合并成一個大的磁盤文件(有序),同時生成一個索引文件;
⑦ reducetask去map端拉取數據的時候,首先解析索引文件,根據索引文件,去拉取數據。
1.2.2 bypass機制
當shuffle read task 的數量小于等于spark.shuffle.sort.bypassMergeThreshold參數的值時(默認為 200),就會啟用bypass機制;
?優點:減少了小文件,不排序,效率較高。
2 容錯機制
RDD的容錯機制又稱Lineage(血統)容錯,Lineage本質上類似于數據庫中的重做日志(Redo Log)
,只不過此重做日志粒度很大,是對全局數據做同樣的重做進而來恢復數據。
RDD的Lineage記錄的是粗顆粒度的特定數據Transformation操作(如filter、map、join等)
。當這個RDD的部分分區數據丟失時,它可以通過Lineage獲取足夠的信息來重新運算和恢復丟失的數據分區。
3 Checkpoint?
3.1 checkpoint 檢查點機制
所謂的檢查點其實就是通過將RDD中間結果寫入磁盤,由于血緣依賴過長會造成容錯成本過高,這樣就不如在中間階段做檢查點容錯,如果檢查點之后有節點出現問題,可以從檢查點開始重做血緣,減少了開銷。
對RDD進行checkpoint操作并不會馬上被執行,必須執行Action操作才能觸發。
檢查點機制是我們在spark streaming中用來保障容錯性的主要機制,它可以使spark streaming階段性的把應用數據存儲到諸如hdfs 等可靠存儲系統中,以供恢復時使用。
3.2 checkpoint 與 持久化機制的區別
checkpoint 的數據通常是保存在高可用的文件系統中,比如HDFS中,所以數據丟失可能性極低,穩定性較好;
持久化的數據丟失的可能性更大,因為節點的故障會導致磁盤、內存的數據丟失。
3.3 checkpoint在spark的2塊應用
① 在sparkcore中對RDD做checkpoint,將RDD數據保存到可靠性存儲(hdfs)以便恢復;
通過將計算代價大的RDD checkpoint一下,當下游RDD計算出錯時,可以直接從checkpoint過的RDD那里讀取數據繼續計算。
② 應用在SparkStreaming中,使用checkpoint用來保存Dstreamgraph以及相關配置信息,以便在Driver崩潰重啟的時候能夠接著之前進度繼續進行處理;
3.4 說明SparkStreaming下checkpoint的使用場景
① 使用場景:有狀態的計算 和 容錯的恢復;
② Checkpoint里邊存的:元數據檢查點 和 數據檢查點;
③ 何時啟動checkpoint檢查點: 做狀態計算 和 容錯恢復時。
4 廣播變量
對于經常用到的變量值,在分布式計算中,多個點檢task一定會多次請求這個變量就會產生大量網絡IO,會影響效率,這時可以使用廣播變量的方式將數據廣播到對應Executor端,這個executor啟動的所有的task會共享這個變量,節省了通信的成本和服務器的資源。
優點:不用從Driver端拉取數據了,也不用從其它節點拉取數據了,只需要從自己的Executor端獲取數據就可以了,減少了網絡IO,提高效率。
缺點:數據一旦廣播出去,后期數據發生變化,無法同步到Executor端,有些場景下可以使用redis.
注意事項:
① 不能將一個RDD使用廣播變量廣播出去;
② 廣播變量只能在driver端定義,不能在Executor端定義;
③ 在Driver端可以修改廣播變量值,在Executor端無法修改廣播變量的值。
5 累加器
如果一個變量不被聲明為一個累加器,那么它將在被改變時不會在driver端進行全局匯總,即使是在分布式運行中每個task運行的只是原始變量的一個副本,并不能改變原始變量的值。但是當這個變量被聲明為累加器后,該變量就會有分布式計數的功能,對數據進行聚合,全局匯總總計。
應用場景:
① 能夠準確的統計數據的各種數據;
② 作為調試工具,能夠觀察每個task信息,通過累加器可以在Spark UI觀察到每次task所處理的記錄數。
注意事項:
① 累加器在driver端定義賦初值;
② 累加器只能在driver端讀取,在executor端更新。
綜上,對于廣播變量和累加器總結
廣播變量:用來高效分發較大的對象。 累加器:用來對信息進行聚合?
6 Kryo序列化
Kryo序列化比Java序列化更快更緊湊,但Spark默認序列化是Java序列化并不是Spark序列化,因為Spark并不支持所有序列化類型,而且每次使用都必須進行注冊。注冊只針對于RDD。在DataFrames和DataSet當中自動實現了Kryo序列化。
7 Spark中數據的本地化方式分為5種
① process_local : 進程本地化 , task計算的數據在當前Executor中, 不同task計算的時候可以共同用這一個數據集,效率高,節省資源;
② node_local : ? 節點本地化 , task計算的數據在當前節點上,task計算的時候不需要跨節點拉取數據,速度也是比較快的;
③ no_pref : ? ? ?沒有本地化 ,這個方式的意思就是數據不是本地化的數據; 比如我們的MySQL數據庫, 如果我們需要的數據在MySQL中 就不牽扯到數據本地化的這個說法;
④ rack_local : ? task計算所需要的數據在同機架不同節點上, 這種方式中,task計算的數據在不同節點上,就牽扯到網絡傳輸的問題了 ,效率就沒有那么高了;
⑤ any : 這種方式就是比較隨意的,可能會牽扯到跨機架的數據傳輸,效率最低;
默認的優先級是從上到下依次降低。
8 SparkSql執行流程
① Parser轉換器,第三方類庫 Antlr 實現。將 sql 字符串切分成 Token,根據語義規則解析成一顆AST語法樹,稱為Unresolved Logical Plan 未解決的邏輯計劃;
? ? ? ? 簡單來說就是判斷 SQL 語句是否符合規范,比如select from where 這些關鍵字是否寫對。就算表名字段名寫錯也無所謂。
② Unresolved Logical Plan經過Analyzer分析器,借助于表的真實數據元數據 schema catalog,進行數據類型綁定和函數綁定,解析為 resolved Logical Plan 已解決的邏輯計劃;
? ? ? ? 簡單來說就是判斷 SQL 語句的表名,字段名是否真的在元數據庫里存在。
③ Optimizer優化器,基于各種優化規則(常量折疊,謂詞下推,列裁剪),將上面的resolved Logical Plan進一步轉換為語法樹 Optimized Logical Plan 優化的邏輯計劃。這個過程稱作 RBO(Rule Based Optimizer 基于規則的優化))。
? ? ? ? 簡單來說就是把可執行的SQL 再調整一下,以便跑得更快。
④ query planner 查詢計劃器,基于 planning 計劃過程,將邏輯計劃轉換成多個物理計劃,再根據代價模型 cost model,篩選出代價最小的物理計劃。這個過程稱之為 CBO(Cost Based Optimizer 基于成本的優化)。
? ? ? ? 上面2-3-4步驟合起來,就是 Catalyst 優化器。
⑤ 最后依據最優的物理計劃,生成 java 字節碼,將 SQL 轉換為 DAG,以 RDD 形式進行操作。
8.1 RDD 與 SparkSQL 運行時的區別
和 RDD 不同, SparkSQL 的 Dataset 和 SQL 并不是直接生成計劃交給集群執行, 而是經過了一個叫做 Catalyst 的優化器, 這個優化器能夠自動幫助開發者優化代碼。
8.2 explain參看邏輯計劃和物理計劃
- SparkSQL中的DSL方式:
?spark.sql('select count(1) from test_db.table1').explain(True)
spark.sql('select count(1) from test_db.table1').explain(True)?
- 普通SQL方式 explain extended select count(1) from table1;
?
8.3 Spark SQL 是如何將數據寫到Hive表的
方式一:是利用 Spark RDD 的 API 將數據寫入 hdfs 形成 hdfs 文件,之后再將 hdfs 文件和 hive 表做加載映射;
方式二:利用 Spark SQL 將獲取的數據 RDD 轉換成 DataFrame,再將 DataFrame 寫成緩存表,最后利用 Spark SQL 直接插入 hive 表中。
8.4 SparkSQL中RDD、DataFrame、DataSet三者的轉換
8.4.1?三者共性
① RDD、DataFrame、DataSet全部都是spark平臺下的分布式彈性數據集,為處理超大數據提供便捷;
② 三者都有惰性,在進行創建、轉換,如map方法時,不會立即執行,只有在遇到Action算子,如foreach()時,三者才會開始遍歷運算;
③ 三者都會根據spark的內存情況自動緩沖運算,這樣即使數據量大,也不會擔心內存溢出;
④ 三者都有partition概念;
⑤ 三者都有許多共同函數,如:filter、排序等;
⑥ 在對DateFrame和DataSet進行操作的都需要包支持; 導入 import spark.implicts._ ;
⑦ DataFrame和DataSet均可使用模式匹配獲取各個字段的值和類型;
8.4.2 三者區別
① RDD:
RDD一般和SparkMlib(機器學習庫)同時使用;
RDD不支持SparkSql操作;
② DataFrame
與RDD和DataSet不同,DataFrame每一行固定內容為Row,每一列的值沒法直接訪問,只有通過解析才能獲取各個字段值;
DataFrame和DataSet一般不與SparkMlib同時使用;
DataFrame和DataSet一般都支持SparkSql的操作;
DataFrame和DataSet支持一些特別方便的保存方式,比如:csv ;
③ DataSet
DataSet和DataFrame擁有完全相同的成員函數,區別只是每一行數據類型不同;
DataFrame也可以叫DataSet[Row],每一行類型是Row,不解析,每一行究竟有哪些字段,各個字段又是什么類型都無從得知。
9 Spark Streaming
9.1 Spark Streaming 基本工作原理
spark streaming 是 spark core API 的一種擴展,可以用于進行大規模、高吞吐量、容錯的實時數據流的處理;
原理:接受實時輸入數據流,然后將數據拆分成 batch,比如每收集一秒的數據封裝成一個 batch,然后將每個 batch 交給 spark 的計算引擎進行處理,最后會生產處一個結果數據流,其中的數據也是一個一個的 batch 組成的。
9.2 DStream以及基本工作原理
DStream 是 spark streaming 提供的一種高級抽象,代表了一個持續不斷的數據流;
DStream 可以通過輸入數據源來創建,比如 Kafka、flume 等,也可以通過其他 DStream 的高階函數來創建,比如 map、reduce、join 和 window 等;
DStream 內部其實不斷產生 RDD,每個 RDD 包含了一個時間段的數據;
Spark streaming 一定是有一個輸入的 DStream 接收數據,按照時間劃分成一個一個的 batch,并轉化為一個 RDD,RDD 的數據是分散在各個子節點的 partition 中。
9.3 Spark Streaming精準一次消費
① 手動維護偏移量;
② 處理完業務數據后,再進行提交偏移量操作
極端情況下,如在提交偏移量時斷網或停電會造成spark程序第二次啟動時重復消費問題,所以在涉及到金額或精確性非常高的場景會使用事物保證精準一次消費。
9.4 SparkStreaming有哪幾種方式消費Kafka中的數據,它們之間的區別是什么
① receiver方式:
將數據拉取到 executor 中做操作,若數據量大,內存 存儲不下,可以通過 WAL,設置了本地存儲,保證數據不丟失,然后使用 Kafka 高級 API 通過 zk 來維護偏移量,保證消費數據。receiver 消費 的數據偏移量是在 zk 獲取的,此方式效率低,容易出現數據丟失。
② 基于Direct 方式:
使用 Kafka 底層 Api,其消費者直接連接 kafka 的分 區上,因為 createDirectStream 創建的 DirectKafkaInputDStream 每 個 batch 所對應的 RDD 的分區與 kafka 分區一一對應,但是需要自己維護偏移量,即用即取,不會給內存造成太大的壓力,效率高。
③ 對比:
基于receiver的方式,是使用Kafka的高階API來在ZooKeeper中保存消費過的offset的。這是消費Kafka數據的傳統方式。這種方式配合著WAL機制可以保證數據零丟失的高可靠性,但是卻無法保證數據被處理一次且僅一次,可能會處理兩次。因為Spark和ZooKeeper之間可能是不同步的。
基于direct的方式,使用kafka的簡單api,Spark Streaming自己就負責追蹤消費的offset,并保存在checkpoint中。Spark自己一定是同步的,因此可以保證數據是消費一次且僅消費一次。
在實際生產環境中大都用Direct方式
9.5 簡述SparkStreaming窗口函數的原理
?窗口函數就是在原來定義的SparkStreaming計算批次大小的基礎上再次進行封裝,每次計算多個批次的數據,同時還需要傳遞一個滑動步長的參數,用來設置當次計算任務完成之后下一次從什么地方開始計算。
?
9.6 SparkStreaming寫一個WordCount案例
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.SparkConfobject StreamWordCount {def main(args: Array[String]): Unit = {//1.初始化Spark配置信息val sparkConf = new SparkConf().setMaster("local[*]").setAppName("StreamWordCount") //實時程序里線程數大于2//2.初始化SparkStreamingContextval ssc = new StreamingContext(sparkConf, Seconds(5)) //采集周期為5秒//3.通過監控端口創建DStream,讀進來的數據為一行行val lineStreams = ssc.socketTextStream("NODE01", 9999)//4.將每一行數據做切分,形成一個個單詞val wordStreams = lineStreams.flatMap(_.split(" "))//5.將單詞映射成元組(word,1)val wordAndOneStreams = wordStreams.map((_, 1)) //6.將相同的單詞次數做統計val wordAndCountStreams = wordAndOneStreams.reduceByKey(_+_)//7.打印wordAndCountStreams.print()//8.啟動采集器SparkStreamingContext,開始執行計算 ssc.start()//9.等待某個批次的任務處理完,在停止服務.ssc.awaitTermination() } }10 Spark中某個task掛掉了,如何知道是哪個task掛掉了
在spark程序中,task有失敗重試機制(根據Spark.task.maxFailures配置,默認是4次),當task任務執行失敗時,并不會直接導致程序drown掉,只是重試了Spark.task.maxFailures 4次后仍然失敗的情況下,程序才會drown掉。
解決:通過“自定義監控器”
需要獲取SparkListenerTaskEnd事件,得繼承SparkListener類,并重寫onTaskEnd方法,在該方法中獲取task失敗reason日志,發郵件給對應的負責人,這樣我們可以在第一時間知道那個task是以什么原因失敗的了。
總結
以上是生活随笔為你收集整理的第二部分:Spark进阶篇的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 易语言多线程批量登录实现一键选中取消选择
- 下一篇: iOS资料-插件第三方文章