040 DataFrame中的write与read编程
一:SparkSQL支持的外部數(shù)據(jù)源
1.支持情況
?
2.External LIbraries
不是內(nèi)嵌的,看起來不支持。
但是現(xiàn)在已經(jīng)有很多開源插件,可以進(jìn)行支持。
?
3.參考材料
· 支持的格式:https://github.com/databricks
?
二:準(zhǔn)備
1.啟動(dòng)服務(wù)
RunJar是metastore服務(wù),在hive那邊開啟。
只需要啟動(dòng)三個(gè)服務(wù)就可以了,以后runjar都要啟動(dòng),因?yàn)檫@里使用hive與spark集成了,不啟動(dòng)這個(gè)服務(wù),就會(huì)總是報(bào)錯(cuò)。
?
2.啟動(dòng)spark-shell
?
三:測試檢驗(yàn)程序
1.DataFrame的構(gòu)成
?
2.結(jié)果
?
3.測試
?
?
4.結(jié)果
?
四:DataFrame的創(chuàng)建
1.創(chuàng)建SQLContext
val sqlContext=new SQLContext(sc)
2.創(chuàng)建DataFrame(兩種方式)
val df=sqlContext.#
val df=sqlContext.read.#
3.DataFrame數(shù)據(jù)轉(zhuǎn)換
val ndf=df.#.#
4.結(jié)果保存
ndf.#
ndf.write.#
?
五:DataFrame的保存
1.第一種方式
將DataFrame轉(zhuǎn)換為RDD,RDD數(shù)據(jù)保存
?
2.第二種方式
直接通過DataFrame的write屬性將數(shù)據(jù)寫出。
但是有限制,必須有定義類實(shí)現(xiàn),默認(rèn)情況:SparkSQL只支持parquet,json,jdbc
?
六:兩個(gè)常用的網(wǎng)站(數(shù)據(jù)源問題)
1.金磚公司提供的一些插件
?
2.package網(wǎng)址
https://spark-packages.org/
?
七:DataFrameReader編程模式
功能: 通過SQLContext提供的reader讀取器讀取外部數(shù)據(jù)源的數(shù)據(jù),并形成DataFrame
1.源碼的主要方法
format:給定數(shù)據(jù)源數(shù)據(jù)格式類型,eg: json、parquet
schema:給定讀入數(shù)據(jù)的數(shù)據(jù)schema,可以不給定,不給定的情況下,進(jìn)行數(shù)據(jù)類型推斷
option:添加參數(shù),這些參數(shù)在數(shù)據(jù)解析的時(shí)候可能會(huì)用到
load:
有參數(shù)的指從參數(shù)給定的path路徑中加載數(shù)據(jù),比如:JSON、Parquet...
無參數(shù)的指直接加載數(shù)據(jù)(根據(jù)option相關(guān)的參數(shù))
jdbc:讀取關(guān)系型數(shù)據(jù)庫的數(shù)據(jù)
json:讀取json格式數(shù)據(jù)
parquet:讀取parquet格式數(shù)據(jù)
orc: 讀取orc格式數(shù)據(jù)
table:直接讀取關(guān)聯(lián)的Hive數(shù)據(jù)庫中的對(duì)應(yīng)表數(shù)據(jù)
?
八:Reader的程序測試
1.新建文件夾
?
2.上傳數(shù)據(jù)
?
3.加載json數(shù)據(jù)
val df=sqlContext.read.format("json").load("spark/sql/people.json")
結(jié)果:
?
4.數(shù)據(jù)展示
df.show()
結(jié)果:
?
5.數(shù)據(jù)注冊成臨時(shí)表并操作展示
結(jié)果:
?
6.和上面的方法等效的方式
sqlContext.sql("select * from json.`spark/sql/people.json`").show()
結(jié)果:
?
7.讀取顯示parquet格式的數(shù)據(jù)
sqlContext.read.format("parquet").load("spark/sql/users.parquet").show()
結(jié)果:
?
8.加載mysql中的數(shù)據(jù)
這個(gè)是服務(wù)器上的mysql。
sqlContext.read.jdbc("jdbc:mysql://linux-hadoop01.ibeifeng.com:3306/mysql?user=root&password=123456", "user", new java.util.Properties()).show()
這個(gè)地方比較特殊。
第一次使用bin/spark-shell進(jìn)入后,使用命令,效果如下:
然后使用這種方式進(jìn)行啟動(dòng),加上jar
?bin/spark-shell --jars /opt/softwares/mysql-connector-java-5.1.27-bin.jar --driver-class-path /opt/softwares/mysql-connector-java-5.1.27-bin.jar
?
九:DataFrameWriter編程模式
功能:將DataFrame的數(shù)據(jù)寫出到外部數(shù)據(jù)源
?
1.源碼主要方法
mode: 給定數(shù)據(jù)輸出的模式
`overwrite`: overwrite the existing data.
`append`: append the data.
`ignore`: ignore the operation (i.e. no-op).
`error`: default option, throw an exception at runtime.
format:給定輸出文件所屬類型, eg: parquet、json
option: 給定參數(shù)
partitionBy:給定分區(qū)字段(要求輸出的文件類型支持?jǐn)?shù)據(jù)分區(qū))
save: 觸發(fā)數(shù)據(jù)保存操作 --> 當(dāng)該API被調(diào)用后,數(shù)據(jù)已經(jīng)寫出到具體的數(shù)據(jù)保存位置了
jdbc:將數(shù)據(jù)輸出到關(guān)系型數(shù)據(jù)庫
當(dāng)mode為append的時(shí)候,數(shù)據(jù)追加方式是:
先將表中的所有索引刪除
再追加數(shù)據(jù)
沒法實(shí)現(xiàn),數(shù)據(jù)不存在就添加,存在就更新的需求
?
十:writer的程序測試
?1.讀取hive數(shù)據(jù),形成DateFrame
?
2.結(jié)果保存為json格式
自動(dòng)創(chuàng)建存儲(chǔ)目錄。
效果:
?
3.不再詳細(xì)粘貼結(jié)果了
1 讀取Hive表數(shù)據(jù)形成DataFrame 2 val df = sqlContext.read.table("common.emp") 3 4 結(jié)果保存json格式 5 df.select("empno","ename").write.mode("ignore").format("json").save("/beifeng/result/json") 6 df.select("empno","ename").write.mode("error").format("json").save("/beifeng/result/json") 7 df.select("empno","ename", "sal").write.mode("overwrite").format("json").save("/beifeng/result/json") 8 df.select("empno","ename").write.mode("append").format("json").save("/beifeng/result/json")\ 9 上面雖然在追加的時(shí)候加上了sal,但是解析沒有問題 10 sqlContext.read.format("json").load("/beifeng/result/json").show() 11 12 結(jié)果保存parquet格式 13 df.select("empno", "ename", "deptno").write.format("parquet").save("/beifeng/result/parquet01") 14 df.select("empno", "ename","sal", "deptno").write.mode("append").format("parquet").save("/beifeng/result/parquet01") ## 加上sal導(dǎo)致解析失敗,讀取數(shù)據(jù)的時(shí)候 15 16 sqlContext.read.format("parquet").load("/beifeng/result/parquet01").show(100) 17 sqlContext.read.format("parquet").load("/beifeng/result/parquet01/part*").show(100) 18 19 partitionBy按照給定的字段進(jìn)行分區(qū) 20 df.select("empno", "ename", "deptno").write.format("parquet").partitionBy("deptno").save("/beifeng/result/parquet02") 21 sqlContext.read.format("parquet").load("/beifeng/result/parquet02").show(100)?
?
?
?
?
轉(zhuǎn)載于:https://www.cnblogs.com/juncaoit/p/6777648.html
總結(jié)
以上是生活随笔為你收集整理的040 DataFrame中的write与read编程的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Spring Boot由jar包转成wa
- 下一篇: 201521123060 《Java程序