2021年大数据Spark(三十二):SparkSQL的External DataSource
?
目錄
External DataSource
數據源與格式
text 數據
json 數據
csv 數據
parquet 數據
jdbc 數據
???????加載/保存數據-API
???????Load 加載數據
???????Save 保存數據
???????保存模式(SaveMode)
???????案例演示
External DataSource
在SparkSQL模塊,提供一套完成API接口,用于方便讀寫外部數據源的的數據(從Spark 1.4版本提供),框架本身內置外部數據源:
在Spark 2.4版本中添加支持Image Source(圖像數據源)和Avro Source。
數據源與格式
?????數據分析處理中,數據可以分為結構化數據、非結構化數據及半結構化數據。
??1)、結構化數據(Structured)
結構化數據源可提供有效的存儲和性能。例如,Parquet和ORC等柱狀格式使從列的子集中提取值變得更加容易。
基于行的存儲格式(如Avro)可有效地序列化和存儲提供存儲優勢的數據。然而,這些優點通常以靈活性為代價。如因結構的固定性,格式轉變可能相對困難。
?2)、非結構化數據(UnStructured)
相比之下,非結構化數據源通常是自由格式文本或二進制對象,其不包含標記或元數據以定義數據的結構。
報紙文章,醫療記錄,圖像,應用程序日志通常被視為非結構化數據。這些類型的源通常要求數據周圍的上下文是可解析的。
?3)、半結構化數據(Semi-Structured)
半結構化數據源是按記錄構建的,但不一定具有跨越所有記錄的明確定義的全局模式。每個數據記錄都使用其結構信息進行擴充。
半結構化數據格式的好處是,它們在表達數據時提供了最大的靈活性,因為每條記錄都是自我描述的。但這些格式的主要缺點是它們會產生額外的解析開銷,并且不是特別為ad-hoc(特定)查詢而構建的。
text 數據
SparkSession加載文本文件數據,提供兩種方法,返回值分別為DataFrame和Dataset,前面【WordCount】中已經使用,下面看一下方法聲明:
可以看出textFile方法底層還是調用text方法,先加載數據封裝到DataFrame中,再使用as[String]方法將DataFrame轉換為Dataset,實際中推薦使用textFile方法,從Spark 2.0開始提供。
無論是text方法還是textFile方法讀取文本數據時,一行一行的加載數據,每行數據使用UTF-8編碼的字符串,列名稱為【value】。?
json 數據
實際項目中,有時處理數據以JSON格式存儲的,尤其后續結構化流式模塊:StructuredStreaming,從Kafka Topic消費數據很多時間是JSON個數據,封裝到DataFrame中,需要解析提取字段的值。以讀取github操作日志JSON數據為例,數據結構如下:
?1)、操作日志數據使用GZ壓縮:2015-03-01-11.json.gz,先使用json方法讀取。
?2)、使用textFile加載數據,對每條JSON格式字符串數據,使用SparkSQL函數庫functions中自帶get_json_obejct函數提取字段:id、type、public和created_at的值。
函數:get_json_obejct使用說明
示例代碼:
package cn.it.sqlimport org.apache.spark.SparkContext
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}/*** SparkSQL讀取JSON格式文本數據*/
object SparkSQLJson {def main(args: Array[String]): Unit = {val spark = SparkSession.builder().appName(this.getClass.getSimpleName.stripSuffix("$")).master("local[*]")// 通過裝飾模式獲取實例對象,此種方式為線程安全的.getOrCreate()val sc: SparkContext = spark.sparkContextsc.setLogLevel("WARN")import spark.implicits._// TODO: 從LocalFS上讀取json格式數據(壓縮)val jsonDF: DataFrame = spark.read.json("data/input/2015-03-01-11.json.gz")//jsonDF.printSchema()jsonDF.show(5, truncate = true)println("===================================================")val githubDS: Dataset[String] = spark.read.textFile("data/input/2015-03-01-11.json.gz")//githubDS.printSchema() // value 字段名稱,類型就是StringgithubDS.show(5,truncate = true)// TODO:使用SparkSQL自帶函數,針對JSON格式數據解析的函數import org.apache.spark.sql.functions._// 獲取如下四個字段的值:id、type、public和created_atval gitDF: DataFrame = githubDS.select(get_json_object($"value", "$.id").as("id"),get_json_object($"value", "$.type").as("type"),get_json_object($"value", "$.public").as("public"),get_json_object($"value", "$.created_at").as("created_at"))gitDF.printSchema()gitDF.show(10, truncate = false)// 應用結束,關閉資源spark.stop()}
}
運行結果:
???????csv 數據
在機器學習中,常常使用的數據存儲在csv/tsv文件格式中,所以SparkSQL中也支持直接讀取格式數據,從2.0版本開始內置數據源。關于CSV/TSV格式數據說明:
SparkSQL中讀取CSV格式數據,可以設置一些選項,重點選項:
?1)、分隔符:sep
默認值為逗號,必須單個字符
?2)、數據文件首行是否是列名稱:header
默認值為false,如果數據文件首行是列名稱,設置為true
?3)、是否自動推斷每個列的數據類型:inferSchema
默認值為false,可以設置為true
官方提供案例:
當讀取CSV/TSV格式數據文件首行是否是列名稱,讀取數據方式(參數設置)不一樣的 。
?第一點:首行是列的名稱,如下方式讀取數據文件
???????//?TODO:?讀取TSV格式數據val?ratingsDF:?DataFrame?=?spark.read//?設置每行數據各個字段之間的分隔符,?默認值為?逗號.option("sep",?"\t")//?設置數據文件首行為列名稱,默認值為?false.option("header",?"true")//?自動推薦數據類型,默認值為false.option("inferSchema",?"true")//?指定文件的路徑.csv("datas/ml-100k/u.dat")ratingsDF.printSchema()ratingsDF.show(10,?truncate?=?false)
?第二點:首行不是列的名稱,如下方式讀取數據(設置Schema信息)
??????//?定義Schema信息val?schema?=?StructType(StructField("user_id",?IntegerType,?nullable?=?true)?::StructField("movie_id",?IntegerType,?nullable?=?true)?::StructField("rating",?DoubleType,?nullable?=?true)?::StructField("timestamp",?StringType,?nullable?=?true)?::?Nil)//?TODO:?讀取TSV格式數據val?mlRatingsDF:?DataFrame?=?spark.read//?設置每行數據各個字段之間的分隔符,?默認值為?逗號.option("sep",?"\t")//?指定Schema信息.schema(schema)//?指定文件的路徑.csv("datas/ml-100k/u.data")mlRatingsDF.printSchema()mlRatingsDF.show(5,?truncate?=?false)
?????將DataFrame數據保存至CSV格式文件,演示代碼如下:
示例代碼??
??????/***?將電影評分數據保存為CSV格式數據*/mlRatingsDF//?降低分區數,此處設置為1,將所有數據保存到一個文件中.coalesce(1).write//?設置保存模式,依據實際業務場景選擇,此處為覆寫.mode(SaveMode.Overwrite).option("sep",?",")//?TODO:?建議設置首行為列名.option("header",?"true").csv("datas/ml-csv-"?+?System.nanoTime())
package cn.it.sqlimport org.apache.spark.SparkContext
import org.apache.spark.sql.types._
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}/*** SparkSQL 讀取CSV/TSV格式數據:* i). 指定Schema信息* ii). 是否有header設置*/
object SparkSQLCsv {def main(args: Array[String]): Unit = {val spark = SparkSession.builder().appName(this.getClass.getSimpleName.stripSuffix("$")).master("local[*]")// 通過裝飾模式獲取實例對象,此種方式為線程安全的.getOrCreate()val sc: SparkContext = spark.sparkContextsc.setLogLevel("WARN")import spark.implicits._/*** 實際企業數據分析中* csv\tsv格式數據,每個文件的第一行(head, 首行),字段的名稱(列名)*/// TODO: 讀取CSV格式數據val ratingsDF: DataFrame = spark.read// 設置每行數據各個字段之間的分隔符, 默認值為 逗號.option("sep", "\t")// 設置數據文件首行為列名稱,默認值為?false.option("header", "true")// 自動推薦數據類型,默認值為false.option("inferSchema", "true")// 指定文件的路徑.csv("data/input/rating_100k_with_head.data")ratingsDF.printSchema()ratingsDF.show(10, truncate = false)println("=======================================================")// 定義Schema信息val schema = StructType(StructField("user_id", IntegerType, nullable = true) ::StructField("movie_id", IntegerType, nullable = true) ::StructField("rating", DoubleType, nullable = true) ::StructField("timestamp", StringType, nullable = true) :: Nil)// TODO: 讀取CSV格式數據val mlRatingsDF: DataFrame = spark.read// 設置每行數據各個字段之間的分隔符, 默認值為 逗號.option("sep", "\t")// 指定Schema信息.schema(schema)// 指定文件的路徑.csv("data/input/rating_100k.data")mlRatingsDF.printSchema()mlRatingsDF.show(10, truncate = false)println("=======================================================")/*** 將電影評分數據保存為CSV格式數據*/mlRatingsDF// 降低分區數,此處設置為1,將所有數據保存到一個文件中.coalesce(1).write// 設置保存模式,依據實際業務場景選擇,此處為覆寫.mode(SaveMode.Overwrite).option("sep", ",")// TODO: 建議設置首行為列名.option("header", "true").csv("data/output/ml-csv-" + System.currentTimeMillis())// 關閉資源spark.stop()}}
???????parquet 數據
SparkSQL模塊中默認讀取數據文件格式就是parquet列式存儲數據,通過參數【spark.sql.sources.default】設置,默認值為【parquet】。
示例代碼:
直接load加載parquet數據和指定parquet格式加載數據。
運行程序結果:
package cn.it.sqlimport org.apache.spark.SparkContext
import org.apache.spark.sql.{DataFrame, SparkSession}/*** SparkSQL讀取Parquet列式存儲數據*/
object SparkSQLParquet {def main(args: Array[String]): Unit = {val spark = SparkSession.builder().appName(this.getClass.getSimpleName.stripSuffix("$")).master("local[*]")// 通過裝飾模式獲取實例對象,此種方式為線程安全的.getOrCreate()val sc: SparkContext = spark.sparkContextsc.setLogLevel("WARN")import spark.implicits._// TODO: 從LocalFS上讀取parquet格式數據val usersDF: DataFrame = spark.read.parquet("data/input/users.parquet")usersDF.printSchema()usersDF.show(10, truncate = false)println("==================================================")// SparkSQL默認讀取文件格式為parquetval df = spark.read.load("data/input/users.parquet")df.printSchema()df.show(10, truncate = false)// 應用結束,關閉資源spark.stop()}
}
???????jdbc 數據
回顧在SparkCore中讀取MySQL表的數據通過JdbcRDD來讀取的,在SparkSQL模塊中提供對應接口,提供三種方式讀取數據:
?方式一:單分區模式
?方式二:多分區模式,可以設置列的名稱,作為分區字段及列的值范圍和分區數目
?方式三:高度自由分區模式,通過設置條件語句設置分區數據及各個分區數據范圍
當加載讀取RDBMS表的數據量不大時,可以直接使用單分區模式加載;當數據量很多時,考慮使用多分區及自由分區方式加載。
從RDBMS表中讀取數據,需要設置連接數據庫相關信息,基本屬性選項如下:
演示代碼如下:
//?連接數據庫三要素信息val?url:?String?=?"jdbc:mysql://node1.itcast.cn:3306/?serverTimezone=UTC&characterEncoding=utf8&useUnicode=true"val?table:?String?=?"db_shop.so"//?存儲用戶和密碼等屬性val?props:?Properties?=?new?Properties()props.put("driver",?"com.mysql.cj.jdbc.Driver")props.put("user",?"root")props.put("password",?"123456")//?TODO:?從MySQL數據庫表:銷售訂單表?so//?def?jdbc(url:?String,?table:?String,?properties:?Properties):?DataFrameval?sosDF:?DataFrame?=?spark.read.jdbc(url,?table,?props)println(s"Count?=?${sosDF.count()}")sosDF.printSchema()sosDF.show(10,?truncate?=?false)
可以使用option方法設置連接數據庫信息,而不使用Properties傳遞,代碼如下:
//?TODO:?使用option設置參數val?dataframe:?DataFrame?=?spark.read.format("jdbc").option("driver",?"com.mysql.cj.jdbc.Driver").option("url",?"jdbc:mysql://node1.itcast.cn:3306/?serverTimezone=UTC&characterEncoding=utf8&useUnicode=true").option("user",?"root").option("password",?"123456").option("dbtable",?"db_shop.so").load()dataframe.show(5,?truncate?=?false)
???????加載/保存數據-API
????SparkSQL提供一套通用外部數據源接口,方便用戶從數據源加載和保存數據,例如從MySQL表中既可以加載讀取數據:load/read,又可以保存寫入數據:save/write。
由于SparkSQL沒有內置支持從HBase表中加載和保存數據,但是只要實現外部數據源接口,也能像上面方式一樣讀取加載數據。
???????Load 加載數據
在SparkSQL中讀取數據使用SparkSession讀取,并且封裝到數據結構Dataset/DataFrame中。
DataFrameReader專門用于加載load讀取外部數據源的數據,基本格式如下:
SparkSQL模塊本身自帶支持讀取外部數據源的數據:
總結起來三種類型數據,也是實際開發中常用的:
?第一類:文件格式數據
文本文件text、csv文件和json文件
?第二類:列式存儲數據
Parquet格式、ORC格式
?第三類:數據庫表
關系型數據庫RDBMS:MySQL、DB2、Oracle和MSSQL
Hive倉庫表
官方文檔:http://spark.apache.org/docs/2.4.5/sql-data-sources-load-save-functions.html
此外加載文件數據時,可以直接使用SQL語句,指定文件存儲格式和路徑:
???????Save 保存數據
SparkSQL模塊中可以從某個外部數據源讀取數據,就能向某個外部數據源保存數據,提供相應接口,通過DataFrameWrite類將數據進行保存。
與DataFrameReader類似,提供一套規則,將數據Dataset保存,基本格式如下:
SparkSQL模塊內部支持保存數據源如下:
所以使用SpakrSQL分析數據時,從數據讀取,到數據分析及數據保存,鏈式操作,更多就是ETL操作。當將結果數據DataFrame/Dataset保存至Hive表中時,可以設置分區partition和分桶bucket,形式如下:
???????保存模式(SaveMode)
?????將Dataset/DataFrame數據保存到外部存儲系統中,考慮是否存在,存在的情況下的下如何進行保存,DataFrameWriter中有一個mode方法指定模式:
通過源碼發現SaveMode時枚舉類,使用Java語言編寫,如下四種保存模式:
?第一種:Append 追加模式,當數據存在時,繼續追加;
?第二種:Overwrite 覆寫模式,當數據存在時,覆寫以前數據,存儲當前最新數據;
?第三種:ErrorIfExists?存在及報錯;
?第四種:Ignore 忽略,數據存在時不做任何操作;
實際項目依據具體業務情況選擇保存模式,通常選擇Append和Overwrite模式。
???????案例演示
package cn.it.sqlimport java.util.Propertiesimport org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}/*** Author itcast* Desc 先準備一個df/ds,然后再將該df/ds的數據寫入到不同的數據源中,最后再從不同的數據源中讀取*/
object DataSourceDemo{case class Person(id:Int,name:String,age:Int)def main(args: Array[String]): Unit = {//1.準備環境-SparkSession和DFval spark: SparkSession = SparkSession.builder().appName("SparkSQL").master("local[*]").getOrCreate()val sc: SparkContext = spark.sparkContextsc.setLogLevel("WARN")val lines: RDD[String] = sc.textFile("data/input/person.txt")val linesArrayRDD: RDD[Array[String]] = lines.map(_.split(" "))val personRDD: RDD[Person] = linesArrayRDD.map(arr=>Person(arr(0).toInt,arr(1),arr(2).toInt))import spark.implicits._val personDF: DataFrame = personRDD.toDFpersonDF.show(6,false)/*+---+--------+---+|id |name ???|age|+---+--------+---+|1 ?|zhangsan|20 ||2 ?|lisi ???|29 ||3 ?|wangwu ?|25 ||4 ?|zhaoliu |30 ||5 ?|tianqi ?|35 ||6 ?|kobe ???|40 |+---+--------+---+*///2.將personDF寫入到不同的數據源personDF.write.mode(SaveMode.Overwrite).json("data/output/json")personDF.write.mode(SaveMode.Overwrite).csv("data/output/csv")personDF.write.mode(SaveMode.Overwrite).parquet("data/output/parquet")val prop = new Properties()prop.setProperty("user","root")prop.setProperty("password","root")personDF.write.mode(SaveMode.Overwrite).jdbc("jdbc:mysql://localhost:3306/bigdata?characterEncoding=UTF-8","person",prop)println("寫入成功!")//personDF.write.text("data/output/text")//會報錯, Text data source supports only a single column, and you have 3 columns.personDF.coalesce(1).write.mode(SaveMode.Overwrite).json("data/output/json1")//personDF.repartition(1)//3.從不同的數據源讀取數據val df1: DataFrame = spark.read.json("data/output/json")val df2: DataFrame = spark.read.csv("data/output/csv").toDF("id_my","name","age")val df3: DataFrame = spark.read.parquet("data/output/parquet")val df4: DataFrame = spark.read.jdbc("jdbc:mysql://localhost:3306/bigdata?characterEncoding=UTF-8","person",prop)df1.show()df2.show()df3.show()df4.show()}
}
總結
以上是生活随笔為你收集整理的2021年大数据Spark(三十二):SparkSQL的External DataSource的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 2021年大数据Spark(三十一):S
- 下一篇: 2021年大数据Spark(三十三):S