spark中dataframe解析_Spark-SQL
生活随笔
收集整理的這篇文章主要介紹了
spark中dataframe解析_Spark-SQL
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
fe
缺點
- 不方便添加新的優化策略
- 線程安全問題
Spark SQL支持三種語言
- java
- Scala
- python
DataFrame
- 大規模數據化結構能歷、提高了運算能力
- 從sql到dataFrame的轉化,支持sql查詢
- RDD是分布式的java對象的集合,對象顳部結構不可知
- dataframe以rdd為基礎的分布式數據集,提供了詳細的結構信息
DataFrame的創建
SparkSession
dataframe的常用操作
df = spark.read.json("people.json") df.printSchema() 查看表的結構 df select(df['name'],df['age']+1).show()df.filter(df['age']>20).show() df.groupby("age").count().show()df.sort(df['age'] desc()).show() df.sort(df['age'].desc(),df['name'].asc()).show()利用反射機制推斷RDD的模式
讀取Mysql數據庫中的數據
DataFrame的創建
from pyspark import SparkContext,SparkConf from pyspark.sql import SparkSession spark = SparkSession.builder.config(conf = SparkConf()).getOrCreate()# 分布式讀取文件 spark.read.text("people.txt") spark.read.format("text").load("people.txt") spark.read.json("people.json") spark.read.format("json").load("people.json") spark.read.parquet("people/parquet") spark.read.format("parquet").load("people.parquent")#文件保存 最后保到一個目錄 df.write.txt("people.txt") df.write.json("people.json") df.write.parquent("people.parquent")# dataFrame的一些常用操作df.printSchema() dat.select('_c1').show() df.filter(df['age']>20).show() df.groupby("age").count().show() df.sort(df["age"].desc()).show() df.sort(df["age"].desc(),df['name'].asc()).show()RDD轉換得到dataFrame
利用反射機制推斷RDD模式
#用ROW對象去封裝一行一行的數據 from pyspark.sql import ROW people = spark.sparkContext.textFile("file:///file_path").map(lambda x:x.split(",")).map(lambda x:ROW(NAME= P[0],age = int(p[1])))schemaPeople = spark.createDataFrame(people) #必須注冊為臨時表才供下面的查詢使用 schemaPeople.createOrReplaceTempView("people") personDF = spark.sql("select name,age from people where age>20") personsDRR = personsDF.rdd.map(lambda p:"Nmae"+p.name+","+age:"+str()p.age)) personsRDD.foreach(print)總結
以上是生活随笔為你收集整理的spark中dataframe解析_Spark-SQL的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: php gmssl,centos7 ph
- 下一篇: 参商