快速理解Spark Dataset
1. 前言
RDD、DataFrame、Dataset是Spark三個最重要的概念,RDD和DataFrame兩個概念出現的比較早,Dataset相對出現的較晚(1.6版本開始出現),有些開發人員對此還不熟悉,本文重點引領快速理解Dataset。
帶著幾個問題去閱讀:
1、DataFrame比RDD有哪些優點?
2、DataFrame和Dataset有什么關系?
3、有了DataFrame為什么還有引入Dataset?
4、Dataset在Spark源碼中長什么樣?
注:本文的環境基于當前最新版本 Spark-2.1.1
2. RDD/DataFrame快速回顧
RDD
彈性分布式數據集,是Spark對數據進行的一種抽象,可以理解為Spark對數據的一種組織方式,更簡單些說,RDD就是一種數據結構,里面包含了數據和操作數據的方法
從字面上就能看出的幾個特點:
彈性:
- 數據可完全放內存或完全放磁盤,也可部分存放在內存,部分存放在磁盤,并可以自動切換
- RDD出錯后可自動重新計算(通過血緣自動容錯)
- 可checkpoint(設置檢查點,用于容錯),可persist或cache(緩存)
- 里面的數據是分片的(也叫分區,partition),分片的大小可自由設置和細粒度調整
分布式:
- RDD中的數據可存放在多個節點上
數據集:
- 數據的集合,沒啥好說的
相對于與DataFrame和Dataset,RDD是Spark最底層的抽象,目前是開發者用的最多的,但逐步會轉向DataFrame和Dataset(當然,這是Spark的發展趨勢)
DataFrame
DataFrame:理解了RDD,DataFrame就容易理解些,DataFrame的思想來源于Python的pandas庫,RDD是一個數據集,DataFrame在RDD的基礎上加了Schema(描述數據的信息,可以認為是元數據,DataFrame曾經就有個名字叫SchemaRDD)
假設RDD中的兩行數據長這樣
?
?
那么DataFrame中的數據長這樣
?
從上面兩個圖可以看出,DataFrame比RDD多了一個表頭信息(Schema),像一張表了,DataFrame還配套了新的操作數據的方法,DataFrame API(如df.select())和SQL(select id, name from xx_table where ...)。
有了DataFrame這個高一層的抽象后,我們處理數據更加簡單了,甚至可以用SQL來處理數據了,對開發者來說,易用性有了很大的提升。
不僅如此,通過DataFrame API或SQL處理數據,會自動經過Spark 優化器(Catalyst)的優化,即使你寫的程序或SQL不高效,也可以運行的很快,很爽吧!
注意:DataFrame是用來處理結構化數據的
3. 步入正文,Dataset
官方解釋如下(英語不好的同學隨意的瞄一眼即可):
A Dataset is a distributed collection of data. Dataset is a new interface added in Spark 1.6 that provides the benefits of RDDs (strong typing, ability to use powerful lambda functions) with the benefits of Spark SQL’s optimized execution engine. A Dataset can be constructed from JVM objects and then manipulated using functional transformations (map, flatMap, filter, etc.). The Dataset API is available in Scala and Java. Python does not have the support for the Dataset API. But due to Python’s dynamic nature, many of the benefits of the Dataset API are already available (i.e. you can access the field of a row by name naturally row.columnName). The case for R is similar.
相對于RDD,Dataset提供了強類型支持,也是在RDD的每行數據加了類型約束
假設RDD中的兩行數據長這樣
那么Dataset中的數據長這樣
或者長這樣(每行數據是個Object)
使用Dataset API的程序,會經過Spark SQL的優化器進行優化(優化器叫什么還記得嗎?)
目前僅支持Scala、Java API,尚未提供Python的API(所以一定要學習Scala)
相比DataFrame,Dataset提供了編譯時類型檢查,對于分布式程序來講,提交一次作業太費勁了(要編譯、打包、上傳、運行),到提交到集群運行時才發現錯誤,實在是想罵人,這也是引入Dataset的一個重要原因。
使用DataFrame的代碼中json文件中并沒有score字段,但是能編譯通過,但是運行時會報異常!如下圖代碼所示
而使用Dataset實現,會在IDE中就報錯,出錯提前到了編譯之前
RDD轉換DataFrame后不可逆,但RDD轉換Dataset是可逆的(這也是Dataset產生的原因)。如下操作所示:
-
啟動spark-shell,創建一個RDD
? -
通過RDD創建DataFrame,再通過DataFrame轉換成RDD,發現RDD的類型變成了Row類型
- 通過RDD創建Dataset,再通過Dataset轉換為RDD,發現RDD還是原始類型
4. Dataset基本操作
將Spark安裝目錄的LICENSE文件上傳至HDFS上,將文件讀入Spark,使用as[]轉換為DataSet
使用Dataset API做轉換操作
創建臨時視圖,進行SQL操作
使用SQL進行單詞統計
使用SQL進行排名分析
5. Dataset源碼初探
如果不想本地搭建源碼閱讀環境,推薦一款在線閱讀源碼的工具insight,不需要本地環境,可以直接引用github中的代碼,非常方便.
Dataset的源碼位于sql目錄下:
sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
由此可以看出Dataset是Spark SQL組件中的東西,另外DataFrame也是SparkSQL中的東西(這也是為什么Spark SQL是Spark生態中發展最迅猛的模塊)
上圖顯示了Dataset文件的結構,可以看出:
- 里面包含了一些變量,比如我們常用的sqlContext
- 里面有很多函數和算子,比如toDF、map等操作數據的算子
前面我們說了,Dataset是個組織數據的的結構,那么數據存儲在哪里呢?
- 主構造函數中需要傳遞三個參數
sparkSession:運行環境信息
queryExecution:數據和執行邏輯信息。注意,數據在這個參數中
encoder:編碼器,用于將JVM對象轉換為SparkSQL的對象(當然這里會有序列化和Schema等)
我們可以使用createDataset函數來創建一個Dataset,如上圖所示。
調用這個函數時,究竟發生了什么呢?
我們來看這個函數的實現:
第三個參數沒傳,使用了隱式的encoder(createDataset中的encoded變量取名不規范,容易混淆)
一張簡單的圖,總結了創建Dataset兩個最重要的步驟
作者:dingyuanpu
鏈接:https://www.jianshu.com/p/77811ae29fdd
來源:簡書
簡書著作權歸作者所有,任何形式的轉載都請聯系作者獲得授權并注明出處。
總結
以上是生活随笔為你收集整理的快速理解Spark Dataset的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Spark基本操作SparkSessio
- 下一篇: Spark中的键值对操作-scala