Spark SQL JOIN操作代码示例
title: Spark SQL JOIN操作
date: 2021-05-08 15:53:21
tags:
- Spark
本文主要介紹 Spark SQL 的多表連接,需要預(yù)先準(zhǔn)備測(cè)試數(shù)據(jù)。分別創(chuàng)建員工和部門的 Datafame,并注冊(cè)為臨時(shí)視圖.
一、數(shù)據(jù)準(zhǔn)備
本文主要介紹 Spark SQL 的多表連接,需要預(yù)先準(zhǔn)備測(cè)試數(shù)據(jù)。分別創(chuàng)建員工和部門的 Datafame,并注冊(cè)為臨時(shí)視圖,代碼如下:
val spark = SparkSession.builder().appName("aggregations").master("local[2]").getOrCreate()val empDF = spark.read.json("/usr/file/json/emp.json") empDF.createOrReplaceTempView("emp")val deptDF = spark.read.json("/usr/file/json/dept.json") deptDF.createOrReplaceTempView("dept")兩表的主要字段如下:
emp 員工表|-- ENAME: 員工姓名|-- DEPTNO: 部門編號(hào)|-- EMPNO: 員工編號(hào)|-- HIREDATE: 入職時(shí)間|-- JOB: 職務(wù)|-- MGR: 上級(jí)編號(hào)|-- SAL: 薪資|-- COMM: 獎(jiǎng)金 dept 部門表|-- DEPTNO: 部門編號(hào)|-- DNAME: 部門名稱|-- LOC: 部門所在城市注:emp.json,dept.json 可以在本倉(cāng)庫(kù)的resources 目錄進(jìn)行下載。
二、連接類型
Spark 中支持多種連接類型:
Inner Join : 內(nèi)連接; Full Outer Join : 全外連接; Left Outer Join : 左外連接; Right Outer Join : 右外連接; Left Semi Join : 左半連接; Left Anti Join : 左反連接; Natural Join : 自然連接; Cross (or Cartesian) Join : 交叉 (或笛卡爾) 連接。其中內(nèi),外連接,笛卡爾積均與普通關(guān)系型數(shù)據(jù)庫(kù)中的相同,如下圖所示:
這里解釋一下左半連接和左反連接,這兩個(gè)連接等價(jià)于關(guān)系型數(shù)據(jù)庫(kù)中的 IN 和 NOT IN 字句:
-- LEFT SEMI JOIN SELECT * FROM emp LEFT SEMI JOIN dept ON emp.deptno = dept.deptno -- 等價(jià)于如下的 IN 語(yǔ)句 SELECT * FROM emp WHERE deptno IN (SELECT deptno FROM dept)-- LEFT ANTI JOIN SELECT * FROM emp LEFT ANTI JOIN dept ON emp.deptno = dept.deptno -- 等價(jià)于如下的 IN 語(yǔ)句 SELECT * FROM emp WHERE deptno NOT IN (SELECT deptno FROM dept)所有連接類型的示例代碼如下:
2.1 INNER JOIN
// 1.定義連接表達(dá)式 val joinExpression = empDF.col("deptno") === deptDF.col("deptno") // 2.連接查詢 empDF.join(deptDF,joinExpression).select("ename","dname").show()// 等價(jià) SQL 如下: spark.sql("SELECT ename,dname FROM emp JOIN dept ON emp.deptno = dept.deptno").show()2.2 FULL OUTER JOIN
empDF.join(deptDF, joinExpression, "outer").show() spark.sql("SELECT * FROM emp FULL OUTER JOIN dept ON emp.deptno = dept.deptno").show()2.3 LEFT OUTER JOIN
empDF.join(deptDF, joinExpression, "left_outer").show() spark.sql("SELECT * FROM emp LEFT OUTER JOIN dept ON emp.deptno = dept.deptno").show()2.4 RIGHT OUTER JOIN
empDF.join(deptDF, joinExpression, "right_outer").show() spark.sql("SELECT * FROM emp RIGHT OUTER JOIN dept ON emp.deptno = dept.deptno").show()2.5 LEFT SEMI JOIN
empDF.join(deptDF, joinExpression, "left_semi").show() spark.sql("SELECT * FROM emp LEFT SEMI JOIN dept ON emp.deptno = dept.deptno").show()2.6 LEFT ANTI JOIN
empDF.join(deptDF, joinExpression, "left_anti").show() spark.sql("SELECT * FROM emp LEFT ANTI JOIN dept ON emp.deptno = dept.deptno").show()2.7 CROSS JOIN
empDF.join(deptDF, joinExpression, "cross").show() spark.sql("SELECT * FROM emp CROSS JOIN dept ON emp.deptno = dept.deptno").show()2.8 NATURAL JOIN
自然連接是在兩張表中尋找那些數(shù)據(jù)類型和列名都相同的字段,然后自動(dòng)地將他們連接起來(lái),并返回所有符合條件的結(jié)果。
spark.sql("SELECT * FROM emp NATURAL JOIN dept").show()以下是一個(gè)自然連接的查詢結(jié)果,程序自動(dòng)推斷出使用兩張表都存在的 dept 列進(jìn)行連接,其實(shí)際等價(jià)于:
spark.sql("SELECT * FROM emp JOIN dept ON emp.deptno = dept.deptno").show()由于自然連接常常會(huì)產(chǎn)生不可預(yù)期的結(jié)果,所以并不推薦使用。
三、連接的執(zhí)行
在對(duì)大表與大表之間進(jìn)行連接操作時(shí),通常都會(huì)觸發(fā) Shuffle Join,兩表的所有分區(qū)節(jié)點(diǎn)會(huì)進(jìn)行 All-to-All 的通訊,這種查詢通常比較昂貴,會(huì)對(duì)網(wǎng)絡(luò) IO 會(huì)造成比較大的負(fù)擔(dān)。
而對(duì)于大表和小表的連接操作,Spark 會(huì)在一定程度上進(jìn)行優(yōu)化,如果小表的數(shù)據(jù)量小于 Worker Node 的內(nèi)存空間,Spark 會(huì)考慮將小表的數(shù)據(jù)廣播到每一個(gè) Worker Node,在每個(gè)工作節(jié)點(diǎn)內(nèi)部執(zhí)行連接計(jì)算,這可以降低網(wǎng)絡(luò)的 IO,但會(huì)加大每個(gè) Worker Node 的 CPU 負(fù)擔(dān)。
是否采用廣播方式進(jìn)行 Join 取決于程序內(nèi)部對(duì)小表的判斷,如果想明確使用廣播方式進(jìn)行 Join,則可以在 DataFrame API 中使用 broadcast 方法指定需要廣播的小表:
empDF.join(broadcast(deptDF), joinExpression).show()參考鏈接:
https://blog.csdn.net/m0_37809146/article/details/91282446
總結(jié)
以上是生活随笔為你收集整理的Spark SQL JOIN操作代码示例的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: GIS基本知识学习PDF文档
- 下一篇: Spark弹性式数据集RDDs