spark sql中的窗口函数
2019獨角獸企業重金招聘Python工程師標準>>>
databricks博客給出的窗口函數概述
Spark SQL supports three kinds of window functions: ranking functions, analytic functions, and aggregate functions. The available ranking functions and analytic functions are summarized in the table below. For aggregate functions, users can use any existing aggregate function as a window function.
窗口函數包含3種:
ranking 和 analytic 見下表,所有已經存在的聚合類函數(sum、avg、max、min)都可以作為窗口函數。
|Function Type| SQL| DataFrame API| |--|--|--| |Ranking |rank | rank | |Ranking |dense_rank|denseRank| |Ranking |percent_rank |percentRank| |Ranking |ntile|ntile| |Ranking |row_number|rowNumber| |Analytic |cume_dist|cumeDist| |Analytic |first_value |firstValue| |Analytic |last_value |lastValue| |Analytic |lag|lag| |Analytic |lead|lead|
先用案例說明
案例數據:/root/score.json/score.json,學生名字、課程、分數
{"name":"A","lesson":"Math","score":100} {"name":"B","lesson":"Math","score":100} {"name":"C","lesson":"Math","score":99} {"name":"D","lesson":"Math","score":98} {"name":"A","lesson":"E","score":100} {"name":"B","lesson":"E","score":99} {"name":"C","lesson":"E","score":99} {"name":"D","lesson":"E","score":98} ./spark-shell --master local #本地啟動spark-shell import org.apache.spark.sql.expressions.Windowimport org.apache.spark.sql.functions._import org.apache.spark.sql.hive.HiveContextsc.setLogLevel("WARN") // 日志級別,可不改val hiveContext = new HiveContext(sc)val df = hiveContext.read.json("file:///root/score.json")case class Score(val name: String, val lesson: String, val score: Int)df.registerTempTable("score") // 注冊臨時表// SQL語句val stat = "select".concat(" name,lesson,score, ").concat(" ntile(2) over (partition by lesson order by score desc ) as ntile_2,").concat(" ntile(3) over (partition by lesson order by score desc ) as ntile_3,").concat(" row_number() over (partition by lesson order by score desc ) as row_number,").concat(" rank() over (partition by lesson order by score desc ) as rank, ").concat(" dense_rank() over (partition by lesson order by score desc ) as dense_rank, ").concat(" percent_rank() over (partition by lesson order by score desc ) as percent_rank ").concat(" from score ").concat(" order by lesson,name,score")hiveContext.sql(stat).show // 執行語句得到的結果/*** 用DataFrame API的方式完成相同的功能。 **/val window_spec = Window.partitionBy("lesson").orderBy(df("score").desc) // 窗口函數中公用的子句df.select(df("name"), df("lesson"), df("score"),ntile(2).over(window_spec).as("ntile_2"),ntile(3).over(window_spec).as("ntile_3"),row_number().over(window_spec).as("row_number"),rank().over(window_spec).as("rank"),dense_rank().over(window_spec).as("dense_rank"),percent_rank().over(window_spec).as("percent_rank")).orderBy("lesson", "name", "score").show- 輸出結果完全一樣,如下表所示
| A | E | 100 | 1 | 1 | 1 | 1 | 1 | 0.0 |
| B | E | 99 | 1 | 1 | 2 | 2 | 2 | 0.3333333333333333 |
| C | E | 99 | 2 | 2 | 3 | 2 | 2 | 0.3333333333333333 |
| D | E | 98 | 2 | 3 | 4 | 4 | 3 | 1.0 |
| A | Math | 100 | 1 | 1 | 1 | 1 | 1 | 0.0 |
| B | Math | 100 | 1 | 1 | 2 | 1 | 1 | 0.0 |
| C | Math | 99 | 2 | 2 | 3 | 3 | 2 | 0.6666666666666666 |
| D | Math | 98 | 2 | 3 | 4 | 4 | 3 | 1.0 |
- rank遇到相同的數據則rank并列,因此rank值可能是不連續的
- dense_rank遇到相同的數據則rank并列,但是rank值一定是連續的
- row_number 很單純的行號,類似excel的行號,不會因為數據相同而rank的值重復或者有間隔
- percent_rank = 相同的分組中 (rank -1) / ( count(score) - 1 )
- ntile(n) 是將同一組數據 循環的往n個 桶中放,返回對應的桶的index,index從1開始。
- 結合官方博客的python調用dataframe API的寫法可知,scala的寫法幾乎和python的一樣。官方博客的地址見最下面的參考。
上面的案例,每個分組中所有的數據都參與到窗口函數中計算了。考慮下面一種場景:
| A | E | 100 | 0 | 98 | 2 | 100 | 0 | 99.0 | 1.0 |
| B | E | 99 | -1 | 98 | 1 | 100 | -1 | 99.0 | 0.0 |
| C | E | 99 | 0 | 98 | 1 | 100 | -1 | 99.0 | 0.0 |
| D | E | 98 | -1 | 98 | 0 | 100 | -2 | 99.0 | -1.0 |
| A | Math | 100 | 0 | 98 | 2 | 100 | 0 | 99.25 | 0.75 |
| B | Math | 100 | 0 | 98 | 2 | 100 | 0 | 99.25 | 0.75 |
| C | Math | 99 | -1 | 98 | 1 | 100 | -1 | 99.25 | -0.25 |
| D | Math | 98 | -1 | 98 | 0 | 100 | -2 | 99.25 | -1.25 |
未完待續
- Analytic functions類型的解析
- 源碼解析
參考:
轉載于:https://my.oschina.net/corleone/blog/755393
總結
以上是生活随笔為你收集整理的spark sql中的窗口函数的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: ip数据报首部校验和的计算
- 下一篇: psql命令