spark sql 上个月_Spark学习之路 (十八)SparkSQL简单使用
一、SparkSQL的進化之路
1.0以前:
Shark
1.1.x開始:
SparkSQL(只是測試性的)? SQL
1.3.x:
SparkSQL(正式版本)+Dataframe
1.5.x:
SparkSQL 鎢絲計劃
1.6.x:
SparkSQL+DataFrame+DataSet(測試版本)
x:
SparkSQL+DataFrame+DataSet(正式版本)
SparkSQL:還有其他的優化
StructuredStreaming(DataSet)
二、認識SparkSQL
2.1 什么是SparkSQL?
spark SQL是spark的一個模塊,主要用于進行結構化數據的處理。它提供的最核心的編程抽象就是DataFrame。
2.2 SparkSQL的作用
提供一個編程抽象(DataFrame) 并且作為分布式 SQL?查詢引擎
DataFrame:它可以根據很多源進行構建,包括:結構化的數據文件,hive中的表,外部的關系型數據庫,以及RDD
2.3 運行原理
將?Spark SQL?轉化為?RDD,?然后提交到集群執行
2.4 特點
(1)容易整合
(2)統一的數據訪問方式
(3)兼容 Hive
(4)標準的數據連接
2.5 SparkSession
SparkSession是Spark 2.0引如的新概念。SparkSession為用戶提供了統一的切入點,來讓用戶學習spark的各項功能。
在spark的早期版本中,SparkContext是spark的主要切入點,由于RDD是主要的API,我們通過sparkcontext來創建和操作RDD。對于每個其他的API,我們需要使用不同的context。例如,對于Streming,我們需要使用StreamingContext;對于sql,使用sqlContext;對于Hive,使用hiveContext。但是隨著DataSet和DataFrame的API逐漸成為標準的API,就需要為他們建立接入點。所以在spark2.0中,引入SparkSession作為DataSet和DataFrame API的切入點,SparkSession封裝了SparkConf、SparkContext和SQLContext。為了向后兼容,SQLContext和HiveContext也被保存下來。
SparkSession實質上是SQLContext和HiveContext的組合(未來可能還會加上StreamingContext),所以在SQLContext和HiveContext上可用的API在SparkSession上同樣是可以使用的。SparkSession內部封裝了sparkContext,所以計算實際上是由sparkContext完成的。
特點:
----為用戶提供一個統一的切入點使用Spark?各項功能
----允許用戶通過它調用?DataFrame?和?Dataset?相關 API?來編寫程序
----減少了用戶需要了解的一些概念,可以很容易的與?Spark?進行交互
----與?Spark?交互之時不需要顯示的創建?SparkConf, SparkContext?以及 SQlContext,這些對象已經封閉在?SparkSession?中
2.7 DataFrames
在Spark中,DataFrame是一種以RDD為基礎的分布式數據集,類似于傳統數據庫中的二維表格。DataFrame與RDD的主要區別在于,前者帶有schema元信息,即DataFrame所表示的二維表數據集的每一列都帶有名稱和類型。這使得Spark SQL得以洞察更多的結構信息,從而對藏于DataFrame背后的數據源以及作用于DataFrame之上的變換進行了針對性的優化,最終達到大幅提升運行時效率的目標。反觀RDD,由于無從得知所存數據元素的具體內部結構,Spark Core只能在stage層面進行簡單、通用的流水線優化。
三、RDD轉換成為DataFrame
使用spark1.x版本的方式
測試數據目錄:/home/hadoop/apps/spark/examples/src/main/resources(spark的安裝目錄里面)
people.txt
3.1 方式一:通過?case class?創建?DataFrames(反射)
//定義case class,相當于表結構
case class People(var name:String,varage:Int)objectTestDataFrame1 {
def main(args: Array[String]): Unit={
val conf= new SparkConf().setAppName("RDDToDataFrame").setMaster("local")
val sc= newSparkContext(conf)
val context= newSQLContext(sc)//將本地的數據讀入 RDD, 并將 RDD 與 case class 關聯
val peopleRDD = sc.textFile("E:\\666\\people.txt")
.map(line=> People(line.split(",")(0), line.split(",")(1).trim.toInt))
import context.implicits._//將RDD 轉換成 DataFrames
val df =peopleRDD.toDF//將DataFrames創建成一個臨時的視圖
df.createOrReplaceTempView("people")//使用SQL語句進行查詢
context.sql("select * from people").show()
}
}
運行結果
3.2 方式二:通過?structType?創建?DataFrames(編程接口)
objectTestDataFrame2 {
def main(args: Array[String]): Unit={
val conf= new SparkConf().setAppName("TestDataFrame2").setMaster("local")
val sc= newSparkContext(conf)
val sqlContext= newSQLContext(sc)
val fileRDD= sc.textFile("E:\\666\\people.txt")//將 RDD 數據映射成 Row,需要 import org.apache.spark.sql.Row
val rowRDD: RDD[Row] = fileRDD.map(line =>{
val fields= line.split(",")
Row(fields(0), fields(1).trim.toInt)
})//創建 StructType 來定義結構
val structType: StructType =StructType(//字段名,字段類型,是否可以為空
StructField("name", StringType, true) ::
StructField("age", IntegerType, true) :: Nil
)/**
* rows: java.util.List[Row],
* schema: StructType
**/val df: DataFrame=sqlContext.createDataFrame(rowRDD,structType)
df.createOrReplaceTempView("people")
sqlContext.sql("select * from people").show()
}
}
運行結果
3.3 方式三:通過 json 文件創建?DataFrames
objectTestDataFrame3 {
def main(args: Array[String]): Unit={
val conf= new SparkConf().setAppName("TestDataFrame2").setMaster("local")
val sc= newSparkContext(conf)
val sqlContext= newSQLContext(sc)
val df: DataFrame= sqlContext.read.json("E:\\666\\people.json")
df.createOrReplaceTempView("people")
sqlContext.sql("select * from people").show()
}
}
四、DataFrame的read和save和savemode
4.1 數據的讀取
objectTestRead {
def main(args: Array[String]): Unit={
val conf= new SparkConf().setAppName("TestDataFrame2").setMaster("local")
val sc= newSparkContext(conf)
val sqlContext= newSQLContext(sc)//方式一
val df1 = sqlContext.read.json("E:\\666\\people.json")
val df2= sqlContext.read.parquet("E:\\666\\users.parquet")//方式二
val df3 = sqlContext.read.format("json").load("E:\\666\\people.json")
val df4= sqlContext.read.format("parquet").load("E:\\666\\users.parquet")//方式三,默認是parquet格式
val df5 = sqlContext.load("E:\\666\\users.parquet")
}
}
4.2 數據的保存
objectTestSave {
def main(args: Array[String]): Unit={
val conf= new SparkConf().setAppName("TestDataFrame2").setMaster("local")
val sc= newSparkContext(conf)
val sqlContext= newSQLContext(sc)
val df1= sqlContext.read.json("E:\\666\\people.json")//方式一
df1.write.json("E:\\111")
df1.write.parquet("E:\\222")//方式二
df1.write.format("json").save("E:\\333")
df1.write.format("parquet").save("E:\\444")//方式三
df1.write.save("E:\\555")
}
}
4.3 數據的保存模式
使用mode
df1.write.format("parquet").mode(SaveMode.Ignore).save("E:\\444")
五、數據源
5.1 數據源只json
參考4.1
5.2 數據源之parquet
參考4.1
5.3 數據源之Mysql
objectTestMysql {
def main(args: Array[String]): Unit={
val conf= new SparkConf().setAppName("TestMysql").setMaster("local")
val sc= newSparkContext(conf)
val sqlContext= newSQLContext(sc)
val url= "jdbc:mysql://192.168.123.102:3306/hivedb"val table= "dbs"val properties= newProperties()
properties.setProperty("user","root")
properties.setProperty("password","root")//需要傳入Mysql的URL、表明、properties(連接數據庫的用戶名密碼)
val df =sqlContext.read.jdbc(url,table,properties)
df.createOrReplaceTempView("dbs")
sqlContext.sql("select * from dbs").show()
}
}
運行結果
5.4 數據源之Hive
(1)準備工作
在pom.xml文件中添加依賴
org.apache.spark
spark-hive_2.11
2.3.0
開發環境則把resource文件夾下添加hive-site.xml文件,集群環境把hive的配置文件要發到$SPARK_HOME/conf目錄下
javax.jdo.option.ConnectionURL
jdbc:mysql://localhost:3306/hivedb?createDatabaseIfNotExist=true
JDBC connect string for a JDBC metastore
javax.jdo.option.ConnectionDriverName
com.mysql.jdbc.Driver
Driver class name for a JDBC metastore
javax.jdo.option.ConnectionUserName
root
username to use against metastore database
javax.jdo.option.ConnectionPassword
root
password to use against metastore database
hive.metastore.warehouse.dir
/hive/warehouse
hive default warehouse, if nessecory, change it
(2)測試代碼
object TestHive {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local").setAppName(this.getClass.getSimpleName)
val sc = new SparkContext(conf)
val sqlContext = new HiveContext(sc)
sqlContext.sql("select * from myhive.student").show()
}
}
運行結果
總結
以上是生活随笔為你收集整理的spark sql 上个月_Spark学习之路 (十八)SparkSQL简单使用的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Android系统分区理解及分区目录细解
- 下一篇: SQL Server各个版本功能比较