分布式离线计算—Spark—SparkStreaming
原文作者:阿里中間件
原文地址:一文讀懂 Spark 和 Spark Streaming?
目錄
MapReduce 的問題所在
Spark 與 RDD 模型
流計算框架:Spark Streaming
流計算與 SQL:Spark Structured Streaming
系統架構
總結
前言
Apache Spark 是當今最流行的開源大數據處理框架。和人們耳熟能詳的 MapReduce 一樣,Spark 用于進行分布式、大規模的數據處理,但 Spark 作為 MapReduce 的接任者,提供了更高級的編程接口、更高的性能。除此之外,Spark 不僅能進行常規的批處理計算,還提供了流式計算支持。Apache Spark 誕生于大名鼎鼎的 AMPLab(這里還誕生過 Mesos 和 Alluxio),從創立之初就帶有濃厚的學術氣質,其設計目標是為各種大數據處理需求提供一個統一的技術棧。如今 Spark 背后的商業公司 Databricks 創始人也是來自 AMPLab 的博士畢業生。Spark 本身使用 Scala 語言編寫,Scala 是一門融合了面向對象與函數式的“雙范式”語言,運行在 JVM 之上。Spark 大量使用了它的函數式、即時代碼生成等特性。Spark 目前提供了 Java、Scala、Python、R 四種語言的 API,前兩者因為同樣運行在 JVM 上可以達到更原生的支持。
MapReduce 的問題所在
Hadoop 是大數據處理領域的開創者。嚴格來說,Hadoop 不只是一個軟件,而是一整套生態系統,例如 MapReduce 負責進行分布式計算,而 HDFS 負責存儲大量文件。MapReduce 模型的誕生是大數據處理從無到有的飛躍。但隨著技術的進步,對大數據處理的需求也變得越來越復雜,MapReduce 的問題也日漸凸顯。通常,我們將 MapReduce 的輸入和輸出數據保留在 HDFS 上,很多時候,復雜的 ETL、數據清洗等工作無法用一次 MapReduce 完成,所以需要將多個 MapReduce 過程連接起來:
▲ 上圖中只有兩個 MapReduce 串聯,實際上可能有幾十個甚至更多,依賴關系也更復雜。這種方式下,每次中間結果都要寫入 HDFS 落盤保存,代價很大(別忘了,HDFS 的每份數據都需要冗余若干份拷貝)。另外,由于本質上是多次 MapReduce 任務,調度也比較麻煩,實時性無從談起。
Spark 與 RDD 模型
針對上面的問題,如果能把中間結果保存在內存里,豈不是快的多?之所以不能這么做,最大的障礙是:分布式系統必須能容忍一定的故障,所謂 fault-tolerance。如果只是放在內存中,一旦某個計算節點宕機,其他節點無法恢復出丟失的數據,只能重啟整個計算任務,這對于動輒成百上千節點的集群來說是不可接受的。
一般來說,想做到 fault-tolerance 只有兩個方案:要么存儲到外部(例如 HDFS),要么拷貝到多個副本。Spark 大膽地提出了第三種——重算一遍。但是之所以能做到這一點,是依賴于一個額外的假設:所有計算過程都是確定性的(deterministic)。Spark 借鑒了函數式編程思想,提出了 RDD(Resilient Distributed Datasets),譯作“彈性分布式數據集”。
RDD 是一個只讀的、分區的(partitioned)數據集合。RDD 要么來源于不可變的外部文件(例如 HDFS 上的文件),要么由確定的算子由其他 RDD 計算得到。RDD 通過算子連接構成有向無環圖(DAG),上圖演示了一個簡單的例子,其中節點對應 RDD,邊對應算子。回到剛剛的問題,RDD 如何做到 fault-tolerance?很簡單,RDD 中的每個分區都能被確定性的計算出來,所以一旦某個分區丟失了,另一個計算節點可以從它的前繼節點出發、用同樣的計算過程重算一次,即可得到完全一樣的 RDD 分區。這個過程可以遞歸的進行下去。
▲ 上圖演示了 RDD 分區的恢復。為了簡潔并沒有畫出分區,實際上恢復是以分區為單位的。Spark 的編程接口和 Java 8 的 Stream 很相似:RDD 作為數據,在多種算子間變換,構成對執行計劃 DAG 的描述。最后,一旦遇到類似?collect()這樣的輸出命令,執行計劃會被發往 Spark 集群、開始計算。不難發現,算子分成兩類:
- map()、filter()、join() 等算子稱為 Transformation,它們輸入一個或多個 RDD,輸出一個 RDD。
- collect()、count()、save() 等算子稱為 Action,它們通常是將數據收集起來返回;
像之前提到的,RDD 的數據由多個分區(partition)構成,這些分區可以分布在集群的各個機器上,這也就是 RDD 中 “distributed” 的含義。熟悉 DBMS 的同學可以把 RDD 理解為邏輯執行計劃,partition 理解為物理執行計劃。
此外,RDD 還包含它的每個分區的依賴分區(dependency),以及一個函數指出如何計算出本分區的數據。Spark 的設計者發現,依賴關系依據執行方式的不同可以很自然地分成兩種:窄依賴(Narrow Dependency)和寬依賴(Wide Dependency),舉例來說:
- map()、filter() 等算子構成窄依賴:生產的每個分區只依賴父 RDD 中的一個分區。
- groupByKey() 等算子構成寬依賴:生成的每個分區依賴父 RDD 中的多個分區(往往是全部分區)。
在執行時,窄依賴可以很容易的按流水線(pipeline)的方式計算:對于每個分區從前到后依次代入各個算子即可。然而,寬依賴需要等待前繼 RDD 中所有分區計算完成;換句話說,寬依賴就像一個柵欄(barrier)會阻塞到之前的所有計算完成。整個計算過程被寬依賴分割成多個階段(stage),如上右圖所示。
了解 MapReduce 的同學可能已經發現,寬依賴本質上就是一個 MapReduce 過程。但是相比 MapReduce 自己寫 Map 和 Reduce 函數的編程接口,Spark 的接口要容易的多;并且在 Spark 中,多個階段的 MapReduce 只需要構造一個 DAG 即可。
聲明式接口:Spark SQL
Spark 誕生后,大幅簡化了 MapReduce 編程模型,但人們并不滿足于此。我們知道,與命令式(imperative)編程相對的是聲明式(declarative)編程,前者需要告訴程序怎樣得到我需要的結果,后者則是告訴程序我需要的結果是什么。舉例而言:你想知道,各個部門?<dept_id, dept_name>中性別為女?'female'的員工分別有多少?
命令式編程中
你需要編寫一個程序。下面給出了一種偽代碼實現:
employees = db.getAllEmployees() countByDept = dict() // 統計各部門女生人數 (dept_id -> count) for employee in employees: if (employee.gender == 'female') countByDept[employee.dept_id] += 1 results = list() // 加上 dept.name 列 depts = db.getAllDepartments() for dept in depts: if (countByDept containsKey dept.id) results.add(row(dept.id, dept.name, countByDept[dept.id])) return results;聲明式編程:
你只要用關系代數的運算表達出結果:
employees.join(dept, employees.deptId == dept.id) .where(employees.gender == 'female') .groupBy(dept.id, dept.name) .agg()
等價地,如果你更熟悉 SQL,也可以寫成這樣:
SELECTdept.id,dept.name,COUNT(*)FROMemployees?JOINdept?ONemployees.dept_id?==dept.idWHEREemployees.gender?='female'GROUPBYdept.id,dept.name
顯然,聲明式的要簡潔的多!但聲明式編程依賴于執行者產生真正的程序代碼,所以除了上面這段程序,還需要把數據模型(即 schema)一并告知執行者。聲明式編程最廣為人知的形式就是 SQL。Spark SQL 就是這樣一個基于 SQL 的聲明式編程接口。你可以將它看作在 Spark 之上的一層封裝,在 RDD 計算模型的基礎上,提供了 DataFrame API 以及一個內置的 SQL 執行計劃優化器 Catalyst。
▲ 上圖黃色部分是 Spark SQL 中新增的部分。DataFrame 就像數據庫中的表,除了數據之外它還保存了數據的 schema 信息。計算中,schema 信息也會經過算子進行相應的變換。DataFrame 的數據是行(row)對象組成的 RDD,對 DataFrame 的操作最終會變成對底層 RDD 的操作。
Catalyst 是一個內置的 SQL 優化器,負責把用戶輸入的 SQL 轉化成執行計劃。Catelyst 強大之處是它利用了 Scala 提供的代碼生成(codegen)機制,物理執行計劃經過編譯,產出的執行代碼效率很高,和直接操作 RDD 的命令式代碼幾乎沒有分別。
▲ 上圖是 Catalyst 的工作流程,與大多數 SQL 優化器一樣是一個 Cost-Based Optimizer (CBO),但最后使用代碼生成(codegen)轉化成直接對 RDD 的操作。流計算框架:Spark Streaming
以往,批處理和流計算被看作大數據系統的兩個方面。我們常常能看到這樣的架構——以 Kafka、Storm 為代表的流計算框架用于實時計算,而 Spark 或 MapReduce 則負責每天、每小時的數據批處理。在 ETL 等場合,這樣的設計常常導致同樣的計算邏輯被實現兩次,耗費人力不說,保證一致性也是個問題。
Spark Streaming 正是誕生于此類需求。傳統的流計算框架大多注重于低延遲,采用了持續的(continuous)算子模型;而 Spark Streaming 基于 Spark,另辟蹊徑提出了?D-Stream(Discretized Streams)方案:將流數據切成很小的批(micro-batch),用一系列的短暫、無狀態、確定性的批處理實現流處理。
Spark Streaming 的做法在流計算框架中很有創新性,它雖然犧牲了低延遲(一般流計算能做到 100ms 級別,Spark Streaming 延遲一般為 1s 左右),但是帶來了三個誘人的優勢:
- 更高的吞吐量(大約是 Storm 的 2-5 倍)
- 更快速的失敗恢復(通常只要 1-2s),因此對于 straggler(性能拖后腿的節點)直接殺掉即可
- 開發者只需要維護一套 ETL 邏輯即可同時用于批處理和流計算
你可能會困惑,流計算中的狀態一直是個難題。但我們剛剛提到 D-Stream 方案是無狀態的,那諸如 word count 之類的問題,怎么做到保持 count 算子的狀態呢?答案是通過 RDD:將前一個時間步的 RDD 作為當前時間步的 RDD 的前繼節點,就能造成狀態不斷更替的效果。實際上,新的狀態 RDD 總是不斷生成,而舊的 RDD 并不會被“替代”,而是作為新 RDD 的前繼依賴。對于底層的 Spark 框架來說,并沒有時間步的概念,有的只是不斷擴張的 DAG 圖和新的 RDD 節點。
▲ 上圖是流式計算 word count 的例子,count 結果在不同時間步中不斷累積。那么另一個問題也隨之而來:隨著時間的推進,上圖中的狀態 RDD?counts會越來越多,他的祖先(lineage)變得越來越長,極端情況下,恢復過程可能溯源到很久之前。這是不可接受的!因此,Spark Streming 會定期地對狀態 RDD 做 checkpoint,將其持久化到 HDFS 等存儲中,這被稱為 lineage cut,在它之前更早的 RDD 就可以沒有顧慮地清理掉了。
關于流行的幾個開源流計算框架的對比,可以參考文章?Comparison of Apache Stream Processing Frameworks。
流計算與 SQL:Spark Structured Streaming
Spark 通過 Spark Streaming 擁有了流計算能力,那 Spark SQL 是否也能具有類似的流處理能力呢?答案是肯定的,只要將數據流建模成一張不斷增長、沒有邊界的表,在這樣的語義之下,很多 SQL 操作等就能直接應用在流數據上。
出人意料的是,Spark Structured Streaming 的流式計算引擎并沒有復用 Spark Streaming,而是在 Spark SQL 上設計了新的一套引擎。因此,從 Spark SQL 遷移到 Spark Structured Streaming 十分容易,但從 Spark Streaming 遷移過來就要困難得多。
很自然的,基于這樣的模型,Spark SQL 中的大部分接口、實現都得以在 Spark Structured Streaming 中直接復用。將用戶的 SQL 執行計劃轉化成流計算執行計劃的過程被稱為增量化(incrementalize),這一步是由 Spark 框架自動完成的。對于用戶來說只要知道:每次計算的輸入是某一小段時間的流數據,而輸出是對應數據產生的計算結果。
▲ 左圖是 Spark Structured Streaming 模型示意圖;右圖展示了同一個任務的批處理、流計算版本,可以看到,除了輸入輸出不同,內部計算過程完全相同。與 Spark SQL 相比,流式 SQL 計算還有兩個額外的特性,分別是窗口(window)和水位(watermark)。窗口(window)是對過去某段時間的定義。批處理中,查詢通常是全量的(例如:總用戶量是多少);而流計算中,我們通常關心近期一段時間的數據(例如:最近24小時新增的用戶量是多少)。用戶通過選用合適的窗口來獲得自己所需的計算結果,常見的窗口有滑動窗口(Sliding Window)、滾動窗口(Tumbling Window)等。水位(watermark)用來丟棄過早的數據。在流計算中,上游的輸入事件可能存在不確定的延遲,而流計算系統的內存是有限的、只能保存有限的狀態,一定時間之后必須丟棄歷史數據。以雙流 A JOIN B 為例,假設窗口為 1 小時,那么 A 中比當前時間減 1 小時更早的數據(行)會被丟棄;如果 B 中出現 1 小時前的事件,因為無法處理只能忽略。
▲ 上圖為水位的示意圖,“遲到”太久的數據(行)由于已經低于當前水位無法處理,將被忽略。水位和窗口的概念都是因時間而來。在其他流計算系統中,也存在相同或類似的概念。
關于 SQL 的流計算模型,常常被拿來對比的還有另一個流計算框架?Apache Flink。與 Spark 相比,它們的實現思路有很大不同,但在模型上是很相似的。
系統架構
Spark 中有三個角色:Driver, Worker 和 Cluster Manager。
驅動程序(Driver)即用戶編寫的程序,對應一個?SparkContext,負責任務的構造、調度、故障恢復等。驅動程序可以直接運行在客戶端,例如用戶的應用程序中;也可以托管在 Master 上,這被稱為集群模式(cluster mode),通常用于流計算等長期任務。
Cluster Manager顧名思義負責集群的資源分配,Spark 自帶的 Spark Master 支持任務的資源分配,并包含一個 Web UI 用來監控任務運行狀況。多個 Master 可以構成一主多備,通過 ZooKeeper 進行協調和故障恢復。通常 Spark 集群使用 Spark Master 即可,但如果用戶的集群中不僅有 Spark 框架、還要承擔其他任務,官方推薦使用 Mesos 作為集群調度器。
Worker節點負責執行計算任務,上面保存了 RDD 等數據。
總結
Spark 是一個同時支持批處理和流計算的分布式計算系統。Spark 的所有計算均構建于 RDD 之上,RDD 通過算子連接形成 DAG 的執行計劃,RDD 的確定性及不可變性是 Spark 實現故障恢復的基礎。Spark Streaming 的 D-Stream 本質上也是將輸入數據分成一個個 micro-batch 的 RDD。
Spark SQL 是在 RDD 之上的一層封裝,相比原始 RDD,DataFrame API 支持數據表的 schema 信息,從而可以執行 SQL 關系型查詢,大幅降低了開發成本。Spark Structured Streaming 是 Spark SQL 的流計算版本,它將輸入的數據流看作不斷追加的數據行。
總結
以上是生活随笔為你收集整理的分布式离线计算—Spark—SparkStreaming的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 分布式实时计算—Spark—Spark
- 下一篇: Spring框架—基础介绍