2021年大数据Spark(二十五):SparkSQL的RDD、DF、DS相关操作
?
目錄
?RDD、DF、DS相關(guān)操作
SparkSQL初體驗(yàn)
SparkSession 應(yīng)用入口
獲取DataFrame/DataSet
使用樣例類
指定類型+列名
自定義Schema
???????RDD、DF、DS相互轉(zhuǎn)換
RDD、DF、DS相關(guān)操作
SparkSQL初體驗(yàn)
Spark 2.0開始,SparkSQL應(yīng)用程序入口為SparkSession,加載不同數(shù)據(jù)源的數(shù)據(jù),封裝到DataFrame/Dataset集合數(shù)據(jù)結(jié)構(gòu)中,使得編程更加簡單,程序運(yùn)行更加快速高效。
?
?
SparkSession 應(yīng)用入口
SparkSession:這是一個(gè)新入口,取代了原本的SQLContext與HiveContext。對(duì)于DataFrame API的用戶來說,Spark常見的混亂源頭來自于使用哪個(gè)“context”。現(xiàn)在使用SparkSession,它作為單個(gè)入口可以兼容兩者,注意原本的SQLContext與HiveContext仍然保留,以支持向下兼容。
文檔:http://spark.apache.org/docs/2.4.5/sql-getting-started.html#starting-point-sparksession
?1)、添加MAVEN依賴
<dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.11</artifactId><version>2.4.5</version></dependency>
?
?
2)、SparkSession對(duì)象實(shí)例通過建造者模式構(gòu)建,代碼如下:
?
?
其中①表示導(dǎo)入SparkSession所在的包,②表示建造者模式構(gòu)建對(duì)象和設(shè)置屬性,③表示導(dǎo)入SparkSession類中implicits對(duì)象object中隱式轉(zhuǎn)換函數(shù)。
?3)、范例演示:構(gòu)建SparkSession實(shí)例,加載文本數(shù)據(jù),統(tǒng)計(jì)條目數(shù)。
package cn.itcast.sqlimport org.apache.spark.SparkContext
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}/*** Author itcast* Desc 演示SparkSQL*/
object SparkSQLDemo00_hello {def main(args: Array[String]): Unit = {//1.準(zhǔn)備SparkSQL開發(fā)環(huán)境println(this.getClass.getSimpleName)println(this.getClass.getSimpleName.stripSuffix("$"))val spark: SparkSession = SparkSession.builder().appName(this.getClass.getSimpleName.stripSuffix("$")).master("local[*]").getOrCreate()val sc: SparkContext = spark.sparkContextsc.setLogLevel("WARN")val df1: DataFrame = spark.read.text("data/input/text")val df2: DataFrame = spark.read.json("data/input/json")val df3: DataFrame = spark.read.csv("data/input/csv")val df4: DataFrame = spark.read.parquet("data/input/parquet")df1.printSchema()df1.show(false)df2.printSchema()df2.show(false)df3.printSchema()df3.show(false)df4.printSchema()df4.show(false)df1.coalesce(1).write.mode(SaveMode.Overwrite).text("data/output/text")df2.coalesce(1).write.mode(SaveMode.Overwrite).json("data/output/json")df3.coalesce(1).write.mode(SaveMode.Overwrite).csv("data/output/csv")df4.coalesce(1).write.mode(SaveMode.Overwrite).parquet("data/output/parquet")//關(guān)閉資源sc.stop()spark.stop()}
}
?
使用SparkSession加載數(shù)據(jù)源數(shù)據(jù),將其封裝到DataFrame或Dataset中,直接使用show函數(shù)就可以顯示樣本數(shù)據(jù)(默認(rèn)顯示前20條)。
Spark2.0使用全新的SparkSession接口替代Spark1.6中的SQLContext及HiveContext接口來實(shí)現(xiàn)其對(duì)數(shù)據(jù)加載、轉(zhuǎn)換、處理等功能。SparkSession實(shí)現(xiàn)了SQLContext及HiveContext所有功能。 SparkSession支持從不同的數(shù)據(jù)源加載數(shù)據(jù),并把數(shù)據(jù)轉(zhuǎn)換成DataFrame,并且支持把DataFrame轉(zhuǎn)換成SQLContext自身中的表,然后使用SQL語句來操作數(shù)據(jù)。SparkSession亦提供了HiveQL以及其他依賴于Hive的功能的支持。
?
獲取DataFrame/DataSet
?????實(shí)際項(xiàng)目開發(fā)中,往往需要將RDD數(shù)據(jù)集轉(zhuǎn)換為DataFrame,本質(zhì)上就是給RDD加上Schema信息,官方提供兩種方式:類型推斷和自定義Schema。
官方文檔:http://spark.apache.org/docs/2.4.5/sql-getting-started.html#interoperating-with-rdds
?
?
???????使用樣例類
當(dāng)RDD中數(shù)據(jù)類型CaseClass樣例類時(shí),通過反射Reflecttion獲取屬性名稱和類型,構(gòu)建Schema,應(yīng)用到RDD數(shù)據(jù)集,將其轉(zhuǎn)換為DataFrame。
package cn.itcast.sqlimport org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}/*** Author itcast* Desc 演示基于RDD創(chuàng)建DataFrame--使用樣例類*/
object CreateDataFrameDemo1 {case class Person(id:Int,name:String,age:Int)def main(args: Array[String]): Unit = {//1.準(zhǔn)備環(huán)境-SparkSessionval spark: SparkSession = SparkSession.builder().appName("SparkSQL").master("local[*]").getOrCreate()val sc: SparkContext = spark.sparkContextsc.setLogLevel("WARN")//2.加載數(shù)據(jù)val lines: RDD[String] = sc.textFile("data/input/person.txt")//3.切割//val value: RDD[String] = lines.flatMap(_.split(" "))//錯(cuò)誤的val linesArrayRDD: RDD[Array[String]] = lines.map(_.split(" "))//4.將每一行(每一個(gè)Array)轉(zhuǎn)為樣例類(相當(dāng)于添加了Schema)val personRDD: RDD[Person] = linesArrayRDD.map(arr=>Person(arr(0).toInt,arr(1),arr(2).toInt))//5.將RDD轉(zhuǎn)為DataFrame(DF)//注意:RDD的API中沒有toDF方法,需要導(dǎo)入隱式轉(zhuǎn)換!import spark.implicits._val personDF: DataFrame = personRDD.toDF//6.查看約束personDF.printSchema()//7.查看分布式表中的數(shù)據(jù)集personDF.show(6,false)//false表示不截?cái)嗔忻?也就是列名很長的時(shí)候不會(huì)用...代替}
}
?
此種方式要求RDD數(shù)據(jù)類型必須為CaseClass,轉(zhuǎn)換的DataFrame中字段名稱就是CaseClass中屬性名稱。
???????指定類型+列名
除了上述兩種方式將RDD轉(zhuǎn)換為DataFrame以外,SparkSQL中提供一個(gè)函數(shù):toDF,通過指定列名稱,將數(shù)據(jù)類型為元組的RDD或Seq轉(zhuǎn)換為DataFrame,實(shí)際開發(fā)中也常常使用。
?
package cn.itcast.sqlimport org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}/*** Author itcast* Desc 演示基于RDD創(chuàng)建DataFrame--使用類型加列名*/
object CreateDataFrameDemo2 {def main(args: Array[String]): Unit = {//1.準(zhǔn)備環(huán)境-SparkSessionval spark: SparkSession = SparkSession.builder().appName("SparkSQL").master("local[*]").getOrCreate()val sc: SparkContext = spark.sparkContextsc.setLogLevel("WARN")//2.加載數(shù)據(jù)val lines: RDD[String] = sc.textFile("data/input/person.txt")//3.切割//val value: RDD[String] = lines.flatMap(_.split(" "))//錯(cuò)誤的val linesArrayRDD: RDD[Array[String]] = lines.map(_.split(" "))//4.將每一行(每一個(gè)Array)轉(zhuǎn)為三元組(相當(dāng)于有了類型!)val personWithColumnsTypeRDD: RDD[(Int, String, Int)] = linesArrayRDD.map(arr=>(arr(0).toInt,arr(1),arr(2).toInt))//5.將RDD轉(zhuǎn)為DataFrame(DF)并指定列名//注意:RDD的API中沒有toDF方法,需要導(dǎo)入隱式轉(zhuǎn)換!import spark.implicits._val personDF: DataFrame = personWithColumnsTypeRDD.toDF("id","name","age")//6.查看約束personDF.printSchema()//7.查看分布式表中的數(shù)據(jù)集personDF.show(6,false)//false表示不截?cái)嗔忻?也就是列名很長的時(shí)候不會(huì)用...代替}
}
?
???????自定義Schema
依據(jù)RDD中數(shù)據(jù)自定義Schema,類型為StructType,每個(gè)字段的約束使用StructField定義,具體步驟如下:
?第一步、RDD中數(shù)據(jù)類型為Row:RDD[Row];
?第二步、針對(duì)Row中數(shù)據(jù)定義Schema:StructType;
?第三步、使用SparkSession中方法將定義的Schema應(yīng)用到RDD[Row]上;
package cn.itcast.sqlimport org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{IntegerType, LongType, StringType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Row, SparkSession}/*** Author itcast* Desc 演示基于RDD創(chuàng)建DataFrame--使用StructType*/
object CreateDataFrameDemo3 {def main(args: Array[String]): Unit = {//1.準(zhǔn)備環(huán)境-SparkSessionval spark: SparkSession = SparkSession.builder().appName("SparkSQL").master("local[*]").getOrCreate()val sc: SparkContext = spark.sparkContextsc.setLogLevel("WARN")//2.加載數(shù)據(jù)val lines: RDD[String] = sc.textFile("data/input/person.txt")//3.切割//val value: RDD[String] = lines.flatMap(_.split(" "))//錯(cuò)誤的val linesArrayRDD: RDD[Array[String]] = lines.map(_.split(" "))//4.將每一行(每一個(gè)Array)轉(zhuǎn)為Rowval rowRDD: RDD[Row] = linesArrayRDD.map(arr=>Row(arr(0).toInt,arr(1),arr(2).toInt))//5.將RDD轉(zhuǎn)為DataFrame(DF)并指定列名//注意:RDD的API中沒有toDF方法,需要導(dǎo)入隱式轉(zhuǎn)換!import spark.implicits._/*val schema: StructType = StructType(StructField("id", IntegerType, false) ::StructField("name", StringType, false) ::StructField("age", IntegerType, false) :: Nil)*/val schema: StructType = StructType(List(StructField("id", IntegerType, false),StructField("name", StringType, false),StructField("age", IntegerType, false)))val personDF: DataFrame = spark.createDataFrame(rowRDD,schema)//6.查看約束personDF.printSchema()//7.查看分布式表中的數(shù)據(jù)集personDF.show(6,false)//false表示不截?cái)嗔忻?也就是列名很長的時(shí)候不會(huì)用...代替}}
此種方式可以更加體會(huì)到DataFrame = RDD[Row] + Schema組成,在實(shí)際項(xiàng)目開發(fā)中靈活的選擇方式將RDD轉(zhuǎn)換為DataFrame。
?
???????RDD、DF、DS相互轉(zhuǎn)換
實(shí)際項(xiàng)目開發(fā)中,常常需要對(duì)RDD、DataFrame及Dataset之間相互轉(zhuǎn)換,其中要點(diǎn)就是Schema約束結(jié)構(gòu)信息。
?1)、RDD轉(zhuǎn)換DataFrame或者Dataset
轉(zhuǎn)換DataFrame時(shí),定義Schema信息,兩種方式
轉(zhuǎn)換為Dataset時(shí),不僅需要Schema信息,還需要RDD數(shù)據(jù)類型為CaseClass類型
?2)、Dataset或DataFrame轉(zhuǎn)換RDD
由于Dataset或DataFrame底層就是RDD,所以直接調(diào)用rdd函數(shù)即可轉(zhuǎn)換
dataframe.rdd 或者dataset.rdd
?3)、DataFrame與Dataset之間轉(zhuǎn)換
由于DataFrame為Dataset特例,所以Dataset直接調(diào)用toDF函數(shù)轉(zhuǎn)換為DataFrame
當(dāng)將DataFrame轉(zhuǎn)換為Dataset時(shí),使用函數(shù)as[Type],指定CaseClass類型即可。
?
?
?
RDD、DataFrame和DataSet之間的轉(zhuǎn)換如下,假設(shè)有個(gè)樣例類:case?class?Emp(name:?String),相互轉(zhuǎn)換
RDD轉(zhuǎn)換到DataFrame:rdd.toDF(“name”)RDD轉(zhuǎn)換到Dataset:rdd.map(x => Emp(x)).toDSDataFrame轉(zhuǎn)換到Dataset:df.as[Emp]DataFrame轉(zhuǎn)換到RDD:df.rddDataset轉(zhuǎn)換到DataFrame:ds.toDFDataset轉(zhuǎn)換到RDD:ds.rdd
注意:
RDD與DataFrame或者DataSet進(jìn)行操作,都需要引入隱式轉(zhuǎn)換import spark.implicits._,其中的spark是SparkSession對(duì)象的名稱!
package cn.itcast.sqlimport org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}/*** Author itcast* Desc 演示基于RDD/DataFrame/DataSet三者之間的相互轉(zhuǎn)換*/
object TransformationDemo {case class Person(id:Int,name:String,age:Int)def main(args: Array[String]): Unit = {//1.準(zhǔn)備環(huán)境-SparkSessionval spark: SparkSession = SparkSession.builder().appName("SparkSQL").master("local[*]").getOrCreate()val sc: SparkContext = spark.sparkContextsc.setLogLevel("WARN")//2.加載數(shù)據(jù)val lines: RDD[String] = sc.textFile("data/input/person.txt")//3.切割//val value: RDD[String] = lines.flatMap(_.split(" "))//錯(cuò)誤的val linesArrayRDD: RDD[Array[String]] = lines.map(_.split(" "))//4.將每一行(每一個(gè)Array)轉(zhuǎn)為樣例類(相當(dāng)于添加了Schema)val personRDD: RDD[Person] = linesArrayRDD.map(arr=>Person(arr(0).toInt,arr(1),arr(2).toInt))//5.將RDD轉(zhuǎn)為DataFrame(DF)//注意:RDD的API中沒有toDF方法,需要導(dǎo)入隱式轉(zhuǎn)換!import spark.implicits._//轉(zhuǎn)換1:rdd-->dfval personDF: DataFrame = personRDD.toDF //注意:DataFrame沒有泛型//轉(zhuǎn)換2:rdd-->dsval personDS: Dataset[Person] = personRDD.toDS() //注意:Dataset具有泛型//轉(zhuǎn)換3:df-->rddval rdd: RDD[Row] = personDF.rdd //注意:DataFrame沒有泛型,也就是不知道里面是Person,所以轉(zhuǎn)為rdd之后統(tǒng)一的使用Row表示里面是很多行//轉(zhuǎn)換4:ds-->rddval rdd1: RDD[Person] = personDS.rdd //注意:Dataset具有泛型,所以轉(zhuǎn)為rdd之后還有原來泛型!//轉(zhuǎn)換5:ds-->dfval dataFrame: DataFrame = personDS.toDF()//轉(zhuǎn)換5:df-->dsval personDS2: Dataset[Person] = personDF.as[Person]//目前DataFrame和DataSet使用類似,如:也有show/createOrReplaceTempView/selectpersonDS.show()personDS.createOrReplaceTempView("t_person")personDS.select("name").show()}
}
?
總結(jié)
以上是生活随笔為你收集整理的2021年大数据Spark(二十五):SparkSQL的RDD、DF、DS相关操作的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 2021年大数据Spark(二十三):S
- 下一篇: 2021年大数据Spark(二十六):S