SparkCore基础
目錄
?
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?Spark簡介
1?什么是Spark
2?Spark特點
3?Spark分布式環境安裝
3.1?Spark HA的環境安裝
3.2?動態增刪一個worker節點到集群
4?Spark核心概念
5 Spark案例
5.2? Master URL
5.3?spark日志的管理
5.4?WordCount案例程序的執行過程
6 Spark作業運行架構圖(standalone模式)
7?RDD操作
7.1?RDD初始化
7.2?RDD操作
7.3?transformation轉換算子
7.3?action行動算子
8?高級排序
8.1?普通的排序
8.2 二次排序
8.3?分組TopN
8.4?優化分組TopN
9?持久化操作
9.1 為什要持久化
9.2?如何進行持久化
9.3?持久化策略
9.4?如何選擇持久化策略
10 共享變量
10.1 概述
10.2?broadcast廣播變量
10.3?accumulator累加器
10.4?自定義累加器
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?SparkCore基礎
1?什么是Spark
Spark是一個通用的可擴展的處理海量數據集的計算引擎。
Spark集成離線計算,實時計算,SQL查詢,機器學習,圖計算為一體的通用的計算框架。
2?Spark特點
(1)快:相比給予MR,官方表明基于內存計算spark要快mr100倍,基于磁盤計算spark要快mr10倍
快的原因:①基于內存計算,②計算和數據的分離 ③基于DAGScheduler的計算劃分 ④只有一次的Shuffle輸出操作
(2)易用:Spark提供超過80多個高階算子函數,來支持對數據集的各種各樣的計算,使用的時候,可以使用java、scala、python、R,非常靈活易用。
(3)通用:在一個項目中,既可以使用離線計算,也可以使用其他比如,SQL查詢,機器學習,圖計算等等,而這時Spark最強大的優勢
(4)到處運行
3?Spark分布式環境安裝
(1)下載解壓,添加環境變量
(2)修改配置文件
spark的配置文件,在$SPARK_HOME/conf目錄下
①拷貝slaves和spark-env.sh文件 :cp slaves.template slaves和cp spark-env.sh.template spark-env.sh
②修改slaves配置,配置spark的從節點的主機名,spark中的從節點叫做worker,主節點叫做Master。vim slaves
bigdata02 bigdata03③修改spark-env.sh文件,添加如下內容
export JAVA_HOME=/opt/jdk export SCALA_HOME=/home/refuel/opt/mouldle/scala export SPARK_MASTER_IP=bigdata01 export SPARK_MASTER_PORT=7077 ##rpc通信端口,類似hdfs的9000端口,不是50070 export SPARK_WORKER_CORES=2 export SPARK_WORKER_INSTANCES=1 export SPARK_WORKER_MEMORY=1g export HADOOP_CONF_DIR=/home/refuel/opt/mouldle/hadoop/etc/hadoop(3)同步spark到其它節點中
3.1?Spark HA的環境安裝
有兩種方式解決單點故障,一種基于文件系統FileSystem(生產中不用),還有一種基于Zookeeper(使用)。 配置基于Zookeeper的一個ha是非常簡單的,只需要在spark-env.sh中添加一句話即可。
export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=bigdata01:2181,bigdata02:2181,bigdata03:2181 -Dspark.deploy.zookeeper.dir=/spark"spark.deploy.recoveryMode設置成 ZOOKEEPER spark.deploy.zookeeper.urlZooKeeper URL spark.deploy.zookeeper.dir ZooKeeper 保存恢復狀態的目錄,缺省為 /spark。因為ha不確定master在bigdata01上面啟動,所以將export SPARK_MASTER_IP=bigdata01和export SPARK_MASTER_PORT=7077注釋掉
3.2?動態增刪一個worker節點到集群
(1)上線一個節點:不需要在現有集群的配置上做任何修改,只需要準備一臺worker機器即可,可和之前的worker的配置相同。
(2)下線一個節點:kill或者stop-slave.sh都可以
4?Spark核心概念
ClusterManager:在Standalone(依托于spark集群本身)模式中即為Master(主節點),控制整個集群,監控Worker。在YARN模式中為資源管理器ResourceManager。
Worker:從節點,負責控制計算節點,啟動Executor。在YARN模式中為NodeManager,負責計算節點的控制,啟動的進程叫Container。
Driver:運行Application的main()函數并創建SparkContext(是spark中最重要的一個概念,是spark編程的入口,作用相當于mr中的Job)。
Executor:執行器,在worker node上執行任務的組件、用于啟動線程池運行任務。每個Application擁有獨立的一組Executors。
SparkContext:整個應用的上下文,控制應用的生命周期,是spark編程的入口。
RDD:彈性式分布式數據集。Spark的基本計算單元,一組RDD可形成執行的有向無環圖RDD Graph。
DAGScheduler:實現將Spark作業分解成一到多個Stage,每個Stage根據RDD的Partition個數決定Task的個數,然后生成相應的Task set放到TaskScheduler中。 DAGScheduler就是Spark的大腦,中樞神經
TaskScheduler:將任務(Task)分發給Executor執行。
Stage:一個Spark作業一般包含一到多個Stage。
Task :一個Stage包含一到多個Task,通過多個Task實現并行運行的功能。 task的個數由rdd的partition分區決定
Transformations:轉換(Transformations) (如:map, filter, groupBy, join等),Transformations操作是Lazy的,也就是說從一個RDD轉換生成另一個RDD的操作不是馬上執行,Spark在遇到Transformations操作時只會記錄需要這樣的操作,并不會去執行,需要等到有Actions操作的時候才會真正啟動計算過程進行計算。
Actions:操作/行動(Actions)算子 (如:count, collect, foreach等),Actions操作會返回結果或把RDD數據寫到存儲系統中。Actions是觸發Spark啟動計算的動因。
SparkEnv:線程級別的上下文,存儲運行時的重要組件的引用。SparkEnv內創建并包含如下一些重要組件的引用。
MapOutPutTracker:負責Shuffle元信息的存儲
BroadcastManager:負責廣播變量的控制與元信息的存儲。
BlockManager:負責存儲管理、創建和查找塊。
MetricsSystem:監控運行時性能指標信息。
SparkConf:負責存儲配置信息。作用相當于hadoop中的Configuration。
5 Spark案例
pom文件的依賴配置如下
<dependencies><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.12</version></dependency><!-- scala去除,因為spark-core包里有了scala的依賴<dependency><groupId>org.scala-lang</groupId><artifactId>scala-library</artifactId><version>2.11.8</version></dependency> --><!-- sparkcore --><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.11</artifactId><version>2.2.2</version></dependency><!-- sparksql --><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.11</artifactId><version>2.2.2</version></dependency><!-- sparkstreaming --><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.11</artifactId><version>2.2.2</version></dependency></dependencies>注意:入口類為SparkContext,java版本的是JavaSparkContext,scala的版本就是SparkContext;SparkSQL的入口有SQLContext、HiveContext;SparkStreaming的入口又是StreamingContext。
java版本
public class JavaSparkWordCountOps {public static void main(String[] args) {//step 1、創建編程入口類SparkConf conf = new SparkConf();conf.setMaster("local[*]");conf.setAppName(JavaSparkWordCountOps.class.getSimpleName());JavaSparkContext jsc = new JavaSparkContext(conf);//step 2、加載外部數據 形成spark中的計算的編程模型RDDJavaRDD<String> linesRDD = jsc.textFile("E:/hello.txt");// step 3、對加載的數據進行各種業務邏輯操作---轉換操作transformationJavaRDD<String> wordsRDD = linesRDD.flatMap(new FlatMapFunction<String, String>() {public Iterator<String> call(String line) throws Exception {return Arrays.asList(line.split("\\s+")).iterator();}});//JavaRDD<String> wordsRDD = linesRDD.flatMap(line -> Arrays.asList(line.split("\\s+")).iterator());System.out.println("-----經過拆分之后的rdd數據----");wordsRDD.foreach(new VoidFunction<String>() {public void call(String s) throws Exception {System.out.println(s);}});System.out.println("-----word拼裝成鍵值對----");JavaPairRDD<String, Integer> pairsRDD = wordsRDD.mapToPair(new PairFunction<String, String, Integer>() {public Tuple2<String, Integer> call(String word) throws Exception {return new Tuple2<String, Integer>(word, 1);}});//JavaPairRDD<String, Integer> pairsRDD = wordsRDD.mapToPair(word -> new Tuple2<String, Integer>(word, 1));pairsRDD.foreach(new VoidFunction<Tuple2<String, Integer>>() {public void call(Tuple2<String, Integer> t) throws Exception {System.out.println(t._1 + "--->" + t._2);}});System.out.println("------按照相同的key,統計value--------------");JavaPairRDD<String, Integer> retRDD = pairsRDD.reduceByKey(new Function2<Integer, Integer, Integer>() {public Integer call(Integer v1, Integer v2) throws Exception {int i = 1 / 0; //印證出這些轉換的transformation算子是懶加載的,需要action的觸發return v1 + v2;}});//JavaPairRDD<String, Integer> retRDD = pairsRDD.reduceByKey((v1, v2) -> v1 + v2);retRDD.foreach(new VoidFunction<Tuple2<String, Integer>>() {public void call(Tuple2<String, Integer> t) throws Exception {System.out.println(t._1 + "--->" + t._2);}});} }scala版本
object SparkWordCountOps {def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("local[*]").setAppName("SparkWordCount")val sc = new SparkContext(conf)//load data from fileval linesRDD:RDD[String] = sc.textFile("E:/hello.txt")val wordsRDD:RDD[String] = linesRDD.flatMap(line => line.split("\\s+"))val pairsRDD:RDD[(String, Int)] = wordsRDD.map(word => (word, 1))val ret = pairsRDD.reduceByKey((v1, v2) => v1 + v2)ret.foreach(t => println(t._1 + "---" + t._2))sc.stop()} }5.2? Master URL
master-url通過sparkConf.setMaster來完成。代表的是spark作業的執行方式,或者指定的spark程序的cluster-manager的類型。
| local | 程序在本地運行,同時為本地程序提供一個線程來處理 |
| local[M] | 程序在本地運行,同時為本地程序分配M個工作線程來處理 |
| local[*] | 程序在本地運行,同時為本地程序分配機器可用的CPU core的個數工作線程來處理 |
| local[M, N] | 程序在本地運行,同時為本地程序分配M個工作線程來處理,如果提交程序失敗,會進行最多N次的重試 |
| spark://ip:port | 基于standalone的模式運行,提交撐到ip對應的master上運行 |
| spark://ip1:port1,ip2:port2 | 基于standalone的ha模式運行,提交撐到ip對應的master上運行 |
| yarn/啟動腳本中的deploy-mode配置為cluster | 基于yarn模式的cluster方式運行,SparkContext的創建在NodeManager上面,在yarn集群中 |
| yarn/啟動腳本中的deploy-mode配置為client | 基于yarn模式的client方式運行,SparkContext的創建在提交程序的那臺機器上面,不在yarn集群中 |
5.3?spark日志的管理
(1)全局管理:項目classpath下面引入log4j.properties配置文件進行管理
# 基本日志輸出級別為INFO,輸出目的地為console log4j.rootCategory=INFO, consolelog4j.appender.console=org.apache.log4j.ConsoleAppender log4j.appender.console.target=System.err log4j.appender.console.layout=org.apache.log4j.PatternLayout log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n# 輸出配置的是spark提供的日志級別 log4j.logger.org.spark_project.jetty=INFO log4j.logger.org.spark_project.jetty.util.component.AbstractLifeCycle=ERROR log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO log4j.logger.org.apache.parquet=ERROR log4j.logger.parquet=ERROR(2)局部管理 :就是在當前類中進行日志的管理。
import org.apache.log4j.{Level, Logger} Logger.getLogger("org.apache.spark").setLevel(Level.WARN) Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN) Logger.getLogger("org.spark_project").setLevel(Level.WARN)5.4?WordCount案例程序的執行過程
當deploy-mode為client模式的時候,driver就在我們提交作業的本機,而spark的作業對應的executor在spark集群中運行。
在上圖中可以發現相鄰兩個rdd之間有依賴關系,依賴分為寬依賴和窄依賴。
窄依賴:rdd中的partition中的數據只依賴于父rdd中的一個partition或者常數個partition。常見的窄依賴操作有:flatMap,map,filter,coalesce等
寬依賴:rdd中的partition中的數據只依賴于父rdd中的所有partition。常見的寬依賴操作有reduceByKey,groupByKey,join,sortByKey,repartition等
rdd和rdd之間的依賴關系構成了一個鏈條,這個鏈條稱之為lineage(血緣)
6 Spark作業運行架構圖(standalone模式)
①啟動spark集群:通過spark的start-all.sh腳本啟動spark集群,啟動了對應的Master進程和Worker進程
②Worker啟動之后向Master進程發送注冊信息
③Worker向Master注冊成功之后,worker要不斷的向master發送心跳包,去監聽主節點是否存在
④Driver向Spark集群提交作業,就是向Master提交作業,申請運行資源
⑤Master收到Driver的提交請求,向Worker節點指派相應的作業任務,就是在對應的Worker節點上啟動對應的executor進程
⑥Worker節點接收到Master節點啟動executor任務之后,就啟動對應的executor進程,向master匯報成功啟動,可以接收任務
⑦executor進程啟動之后,就像Driver進程進行反向注冊,告訴Driver誰可以執行spark任務
⑧Driver接收到注冊之后,就知道向誰發送spark作業,那么這樣在spark集群中就有一組獨立的executor進程為該Driver服務
⑨DAGScheduler根據編寫的spark作業邏輯,將spark作業分成若干個階段Stage(基于Spark的transformation里是否有shuffle Dependency),然后為每一個階段組裝一批task組成taskSet(task里面包含了序列化之后的我們編寫的spark transformation),然后將這些DAGScheduler組裝好的taskSet,交給taskScheduler,由taskScheduler將這些任務發給對應的executor
⑩executor進程接收到了Driver發送過來的taskSet之后,進行反序列化,然后將這些task封裝進一個叫tasksunner的線程中,然后放到本地線程池中調度我們的作業的執行。
7?RDD操作
7.1?RDD初始化
RDD的初始化,原生api提供的2中創建方式:
①是讀取文件textFile
②加載一個scala集合parallelize。
當然,也可以通過transformation算子來創建的RDD。
7.2?RDD操作
RDD操作算子的分類,基本上分為兩類:transformation和action,當然更加細致的分,可以分為輸入算子,轉換算子,緩存算子,行動算子。
輸入:在Spark程序運行中,數據從外部數據空間(如分布式存儲:textFile讀取HDFS等,parallelize方法輸入Scala集合或數據)輸入Spark,數據進入Spark運行時數據空間,轉化為Spark中的數據塊,通過BlockManager進行管理。
運行:在Spark數據輸入形成RDD后便可以通過變換算子,如filter等,對數據進行操作并將RDD轉化為新的RDD,通過Action算子,觸發Spark提交作業。 如果數據需要復用,可以通過Cache算子,將數據緩存到內存。
輸出:程序運行結束數據會輸出Spark運行時空間,存儲到分布式存儲中(如saveAsTextFile輸出到HDFS),或Scala數據或集合中(collect輸出到Scala集合,count返回Scala int型數據)。
7.3?transformation轉換算子
(1)map
rdd.map(func):RDD,對rdd集合中的每一個元素,都作用一次該func函數,之后返回值為生成元素構成的一個新的RDD。
(2)flatMap
rdd.flatMap(func):RDD ==>rdd集合中的每一個元素,都要作用func函數,返回0到多個新的元素,這些新的元素共同構成一個新的RDD。
map操作是一個一到一的操作,flatMap操作是一個1到多的操作
(3)filter
rdd.filter(func):RDD ==> 對rdd中的每一個元素操作func函數,該函數的返回值為Boolean類型,保留返回值為true的元素,共同構成一個新的RDD,過濾掉哪些返回值為false的元素。
(4)sample
rdd.sample(withReplacement:Boolean, fraction:Double [, seed:Long]):RDD ===> 抽樣,sample抽樣不是一個精確的抽樣。一個非常重要的作用,就是來看rdd中數據的分布情況,根據數據分布的情況,進行各種調優與優化,防止數據傾斜。
withReplacement:抽樣的方式,true有放回抽樣, false為無返回抽樣
fraction: 抽樣比例,取值范圍就是0~1
seed: 抽樣的隨機數種子,有默認值,通常也不需要傳值
(5)union
rdd1.union(rdd2),聯合rdd1和rdd2中的數據,形成一個新的rdd,其作用相當于sql中的union all。
(6)join
join就是sql中的inner join。
注意:要想兩個RDD進行連接,那么這兩個rdd的數據格式,必須是k-v鍵值對的,其中的k就是關聯的條件,也就是sql中的on連接條件。
RDD1的類型[K, V], RDD2的類型[K, W]
內連接 :val joinedRDD:RDD[(K, (V, W))] = rdd1.join(rdd2)
左外連接 :val leftJoinedRDD:RDD[(K, (V, Option[W]))] = rdd1.leftOuterJoin(rdd2)
右外連接 :val rightJoinedRDD:RDD[(K, (Option[V], W))] = rdd1.rightOuterJoin(rdd2)
全連接 :val fullJoinedRDD:RDD[(K, (Option[V], Option[W]))] = rdd1.fullOuterJoin(rdd2)
(7)groupByKey
rdd.groupByKey(),按照key進行分組,如果原始rdd的類型時[(K, V)] ,那必然其結果就肯定[(K, Iterable[V])],是一個shuffle dependency寬依賴shuffle操作,但是這個groupByKey不建議在工作過程中使用,除非非要用,因為groupByKey沒有本地預聚合,性能較差,一般我們能用下面的reduceByKey或者combineByKey或者aggregateByKey代替就盡量代替。
(8)reduceByKey
rdd.reduceByKey(func:(V, V) => V):RDD[(K, V)] :在scala集合中學習過一個reduce(func:(W, W) => W)操作,是一個聚合操作,這里的reduceByKey按照就理解為在groupByKey(按照key進行分組[(K, Iterable[V])])的基礎上,對每一個key對應的Iterable[V]執行reduce操作。
同時reduceByKey操作會有一個本地預聚合的操作,所以是一個shuffle dependency寬依賴shuffle操作。
(9)sortByKey
按照key進行排序
(10)combineByKey
這是spark最底層的聚合算子之一,按照key進行各種各樣的聚合操作,spark提供的很多高階算子,都是基于該算子實現的。
def combineByKey[C](createCombiner: V => C,mergeValue: (C, V) => C,mergeCombiners: (C, C) => C): RDD[(K, C)] = {... }createCombiner: V => C, 相同的Key在分區中會調用一次該函數,用于創建聚合之后的類型,為了和后續Key相同的數據進行聚合;mergeValue: (C, V) => C, 在相同分區中基于上述createCombiner基礎之上的局部聚合;mergeCombiners: (C, C) => C) 將每個分區中相同key聚合的結果在分區間進行全局聚合
(11)aggregateByKey
aggregateByKey[U:?ClassTag](zeroValue:?U)(seqOp:?(U, V)?=> U, combOp:?(U, U)?=> U):?RDD[(K, U)]和combineByKey都是一個相對底層的聚合算子,可以完成系統沒有提供的其它操作,相當于自定義算子。aggregateByKey底層使用combineByKeyWithClassTag來實現,所以本質上二者沒啥區別,區別就在于使用時的選擇而已。
aggregateByKey更為簡單,但是如果聚合前后數據類型不一致,建議使用combineByKey;同時如果初始化操作較為復雜,也建議使用combineByKey。
7.3?action行動算子
這些算子都是在rdd上的分區partition上面執行的,不是在driver本地執行。
(1)foreach
用于遍歷RDD,將函數f應用于每一個元素,無返回值(action算子)
(2)count
統計該rdd中元素的個數
(3)take(n)
返回該rdd中的前N個元素,如果該rdd的數據是有序的,那么take(n)就是Top N
(4)first
take(n)中比較特殊的一個take(1)(0)
(5)collect
將分布在集群中的各個partition中的數據拉回到driver中,進行統一的處理;但是這個算子有很大的風險存在,第一,driver內存壓力很大,第二數據在網絡中大規模的傳輸,效率很低;所以一般不建議使用,如果非要用,請先執行filter。
(6)reduce
reduce是一個action操作,reduceByKey是一個transformation。reduce對一個rdd執行聚合操作,并返回結果,結果是一個值。
(7)countByKey
統計key出現的次數
(8)saveAsTextFile
保存到文件,本質上是saveAsHadoopFile[TextOutputFormat[NullWritable, Text]]
(9)saveAsObjectFile和saveAsSequenceFile
saveAsObjectFile本質上是saveAsSequenceFile
(10)saveAsHadoopFile和saveAsNewAPIHadoopFile
這二者的主要區別就是OutputFormat的區別,接口org.apache.hadoop.mapred.OutputFormat,
抽象類org.apache.hadoop.mapreduce.OutputFormat? ?所以saveAshadoopFile使用的是接口OutputFormat,saveAsNewAPIHadoopFile使用的抽象類OutputFormat,建議使用后者。
8?高級排序
8.1?普通的排序
(1)sortByKey
object SortByKeyOps {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("SortByKeyOps").setMaster("local[2]")val sc = new SparkContext(conf)//sortByKey 數據類型為k-v,且是按照key進行排序val stuRDD:RDD[Student] = sc.parallelize(List(Student(1, "refuel01", 19, 168),Student(2, "refuel02", 25, 175),Student(3, "refuel03", 25, 176),Student(4, "refuel04", 16, 180),Student(5, "refuel05", 18, 168.5)))//按照學生身高進行降序排序val height2Stu = stuRDD.map(stu => (stu.height, stu))//注意:sortByKey是局部排序,不是全局排序,如果要進行全局排序,// 必須將所有的數據都拉取到一臺機器上面才可以val sorted = height2Stu.sortByKey(ascending = false, numPartitions = 1)sorted.foreach{case (height, stu) => println(stu)}sc.stop()} }case class Student(id:Int, name:String, age:Int, height:Double)(2)sortBy
這個sortBy其實使用sortByKey來實現,但是比sortByKey更加靈活,因為sortByKey只能應用在k-v數據格式上,而這個sortBy可以應在非k-v鍵值對的數據格式上面。
val sortedBy = stuRDD.sortBy(stu => stu.height,ascending = true,numPartitions = 1)(new Ordering[Double](){override def compare(x: Double, y: Double) = y.compareTo(x)},ClassTag.Double.asInstanceOf[ClassTag[Double]]) sortedBy.foreach(println)sortedBy的操作,除了正常的升序,分區個數以外,還需需要傳遞一個將原始數據類型,提取其中用于排序的字段;并且提供用于比較的方式,以及在運行時的數據類型ClassTag標記型trait。
(3)takeOrdered
takeOrdered也是對rdd進行排序,但是和上述的sortByKey和sortBy相比較,takeOrdered是一個action操作,返回值為一個集合,而前兩者為transformation,返回值為rdd。如果我們想在driver中獲取排序之后的結果,那么建議使用takeOrdered,因為該操作邊排序邊返回。其實是take和sortBy的一個結合體。
takeOrdered(n),獲取排序之后的n條記錄
//先按照身高降序排序,身高相對按照年齡升序排 ---> 二次排序 stuRDD.takeOrdered(3)(new Ordering[Student](){override def compare(x: Student, y: Student) = {var ret = y.height.compareTo(x.height)if(ret == 0) {ret = x.age.compareTo(y.age)}ret} }).foreach(println)8.2 二次排序
所謂二次排序,指的是排序字段不唯一,有多個,共同排序
object SortByKeyOps {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("SortByKeyOps").setMaster("local[2]")val sc = new SparkContext(conf)//sortByKey 數據類型為k-v,且是按照key進行排序val personRDD:RDD[Student] = sc.parallelize(List(Student(1, "refuel01", 19, 168),Student(2, "refuel02", 25, 175),Student(3, "refuel03", 25, 176),Student(4, "refuel04", 16, 180),Student(5, "refuel05", 18, 168.5)))personRDD.map(stu => (stu, null)).sortByKey(true, 1).foreach(p => println(p._1))sc.stop()} }case class Person(id:Int, name:String, age:Int, height:Double) extends Ordered[Person] {//對學生的身高和年齡依次排序override def compare(that: Person) = {var ret = this.height.compareTo(that.height)if(ret == 0) {ret = this.age.compareTo(that.age)}ret} }8.3?分組TopN
在分組的情況之下,獲取每個組內的TopN數據
object GroupSortTopN {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("GroupSortTopN").setMaster("local[2]")val sc = new SparkContext(conf)val lines = sc.textFile("file:/E:/data/topn.txt")//按照科目進行排序val course2Info:RDD[(String, String)] = lines.map(line => {val spaceIndex = line.indexOf(" ")val course = line.substring(0, spaceIndex)val info = line.substring(spaceIndex + 1)(course, info)})//按照科目排序,指的是科目內排序,不是科目間的排序,所以需要把每個科目的信息匯總val course2Infos:RDD[(String, Iterable[String])] = course2Info.groupByKey()//按照key進行分組//分組內的排序val sorted:RDD[(String, mutable.TreeSet[String])] = course2Infos.map{case (course, infos) => {val topN = mutable.TreeSet[String]()(new Ordering[String](){override def compare(x: String, y: String) = {val xScore = x.split("\\s+")(1)val yScore = y.split("\\s+")(1)yScore.compareTo(xScore)}})for(info <- infos) {topN.add(info)}(course, topN.take(3))}}sorted.foreach(println)sc.stop()} }8.4?優化分組TopN
上述在編碼過程當中使用groupByKey,我們說著這個算子的性能很差,因為沒有本地預聚合,所以應該在開發過程當中盡量避免使用,能用其它代替就代替。
(1)使用combineByKey優化1
object GroupSortByCombineByKeyTopN {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("GroupSortByCombineByKeyTopN").setMaster("local[2]")val sc = new SparkContext(conf)val lines = sc.textFile("file:/E:/data/topn.txt")//按照科目進行排序val course2Info:RDD[(String, String)] = lines.map(line => {val spaceIndex = line.indexOf(" ")val course = line.substring(0, spaceIndex)val info = line.substring(spaceIndex + 1)(course, info)})//按照科目排序,指的是科目內排序,不是科目間的排序,所以需要把每個科目的信息匯總val course2Infos= course2Info.combineByKey(createCombiner, mergeValue, mergeCombiners)//分組內的排序val sorted:RDD[(String, mutable.TreeSet[String])] = course2Infos.map{case (course, infos) => {val topN = mutable.TreeSet[String]()(new Ordering[String](){override def compare(x: String, y: String) = {val xScore = x.split("\\s+")(1)val yScore = y.split("\\s+")(1)yScore.compareTo(xScore)}})for(info <- infos) {topN.add(info)}(course, topN.take(3))}}sorted.foreach(println)sc.stop()}def createCombiner(info:String): ArrayBuffer[String] = {val ab = new ArrayBuffer[String]()ab.append(info)ab}def mergeValue(ab:ArrayBuffer[String], info:String): ArrayBuffer[String] = {ab.append(info)ab}def mergeCombiners(ab:ArrayBuffer[String], ab1: ArrayBuffer[String]): ArrayBuffer[String] = {ab.++:(ab1)} }此時這種寫法和上面的groupByKey性能一模一樣,沒有任何的優化。
(2)使用combineByKey的優化2
object GroupSortByCombineByKeyTopN {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("GroupSortByCombineByKeyTopN").setMaster("local[2]")val sc = new SparkContext(conf)val lines = sc.textFile("file:/E:/data/spark/topn.txt")//按照科目進行排序val course2Info:RDD[(String, String)] = lines.map(line => {val spaceIndex = line.indexOf(" ")val course = line.substring(0, spaceIndex)val info = line.substring(spaceIndex + 1)(course, info)})//按照科目排序,指的是科目內排序,不是科目間的排序,所以需要把每個科目的信息匯總val sorted= course2Info.combineByKey(createCombiner, mergeValue, mergeCombiners)sorted.foreach(println)sc.stop()}def createCombiner(info:String): mutable.TreeSet[String] = {val ts = new mutable.TreeSet[String]()(new Ordering[String](){override def compare(x: String, y: String) = {val xScore = x.split("\\s+")(1)val yScore = y.split("\\s+")(1)yScore.compareTo(xScore)}})ts.add(info)ts}def mergeValue(ab:mutable.TreeSet[String], info:String): mutable.TreeSet[String] = {ab.add(info)if(ab.size > 3) {ab.take(3)} else {ab}}def mergeCombiners(ab:mutable.TreeSet[String], ab1: mutable.TreeSet[String]): mutable.TreeSet[String] = {for (info <- ab1) {ab.add(info)}if(ab.size > 3) {ab.take(3)} else {ab}} }?
9?持久化操作
9.1 為什要持久化
一個RDD如果被多次操作,為了提交后續的執行效率,我們建議對該RDD進行持久化操作。
9.2?如何進行持久化
rdd.persist()/cache()就完成了rdd的持久化操作,我們可以將該rdd的數據持久化到內存,磁盤,等等。
如果我們已經不再對該rdd進行一個操作,而此時程序并沒有終止,可以卸載已經持久化的該rdd數據,rdd.unPersist()。
9.3?持久化策略
可以通過persist(StoreageLevle的對象)來指定持久化策略,eg:StorageLevel.MEMORY_ONLY。
| MEMORY_ONLY(默認) | rdd中的數據以未經序列化的java對象格式,存儲在內存中。如果內存不足,剩余的部分不持久化,使用的時候,沒有持久化的那一部分數據重新加載。這種效率是最高,但是是對內存要求最高的。 |
| MEMORY_ONLY_SER | 就比MEMORY_ONLY多了一個SER序列化,保存在內存中的數據是經過序列化之后的字節數組,同時每一個partition此時就是一個比較大的字節數組。 |
| MEMORY_AND_DISK | 和MEMORY_ONLY相比就多了一個,內存存不下的數據存儲在磁盤中。 |
| MEMEORY_AND_DISK_SER | 比MEMORY_AND_DISK多了個序列化。 |
| DISK_ONLY | 就是MEMORY_ONLY對應,都保存在磁盤,效率太差,一般不用。 |
| xxx_2 | 就是上述多個策略后面加了一個_2,比如MEMORY_ONLY_2,MEMORY_AND_DISK_SER_2等等,就多了一個replicate而已,備份,所以性能會下降,但是容錯或者高可用加強了。所以需要在二者直接做權衡。如果說要求數據具備高可用,同時容錯的時間花費比從新計算花費時間少,此時便可以使用,否則一般不用。 |
| HEAP_OFF(experimental) | 使用非Spark的內存,也即堆外內存,比如Tachyon,HBase、Redis等等內存來補充spark數據的緩存。 |
9.4?如何選擇持久化策略
(1)如果要持久化的數據是可以在內存中進行保存,那么毫無疑問,選擇MEMEORY_ONLY,因為這種方式的效率是最高的,但是在生成中往往要進行緩存的數據量還是蠻大的,而且因為數據都是未經序列化的java對象,所以很容易引起頻繁的gc。
(2)如果上述滿足不了,就退而求其次,MEMORY_ONLY_SER,這種方式增加的額外的性能開銷就是序列化和反序列化,經過反序列化之后的對象就是純java對象,因此性能還是蠻高的。
(3)如果還是扛不住,再退而求其次,MEMOEY_AND_DISK_SER,因為到這一步的話,那說明對象體積確實很多,為了提交執行效率,應該盡可能的將數據保存在內存,所以就對數據進行序列化,其次在序列化到磁盤。
(4)一般情況下DISK_ONLY,DISK_SER不用,效率太低,有時候真的不容從源頭計算一遍。
(5)一般情況下我們都不用XXX_2,代備份的種種持久化策略,除非程序對數據的安全性要求非常高,或者說備份的對性能的消耗低于從頭再算一遍,我們可以使用這種xxx_2以外,基本不用。
10 共享變量
10.1 概述
如果transformation使用到Driver中的變量,在executor中執行的時候,就需要通過網絡傳輸到對應的executor,如果該變量很大,那么網絡傳輸一定會成為性能的瓶頸。Spark就提供了兩種有限類型的共享變量:累加器和廣播變量
10.2?broadcast廣播變量
廣播變量:為每個task都拷貝一份變量,將變量包裝成為一個廣播變量(broadcast),只需要在executor中拷貝一份,在task運行的時候,直接從executor調用即可,相當于局部變量變成成員變量,性能就得到了提升。
val num:Any = xxxval numBC:Broadcast[Any] = sc.broadcast(num)調用:val n = numBC.value注意:該num需要進行序列化。10.3?accumulator累加器
累加器的一個好處是,不需要修改程序的業務邏輯來完成數據累加,同時也不需要額外的觸發一個action job來完成累加,反之必須要添加新的業務邏輯,必須要觸發一個新的action job來完成,顯然這個accumulator的操作性能更佳!
構建一個累加器val accu = sc.longAccumuator()累加的操作accu.add(參數)獲取累加器的結果val ret = accu.value val conf = new SparkConf() .setAppName("AccumulatorOps") .setMaster("local[*]")val sc = new SparkContext(conf)val lines = sc.textFile("file:/data.txt") val words = lines.flatMap(_.split("\\s+"))//統計每個單詞出現的次數 val accumulator = sc.longAccumulatorval rbk = words.map(word => {if(word == "is")accumulator.add(1)(word, 1) }).reduceByKey(_+_) rbk.foreach(println) println("================使用累加器===================") println("is: " + accumulator.value)Thread.sleep(1000000) sc.stop()注意:累加器的調用,在action之后被調用,也就是說累加器必須在action觸發之后;多次使用同一個累加器,應該盡量做到用完即重置;盡量給累加器指定name,方便我們在web-ui上面進行查看
10.4?自定義累加器
自定義一個類繼承AccumulatorV2,重寫方法
/*自定義累加器IN 指的是accmulator.add(sth.)中sth的數據類型OUT 指的是accmulator.value返回值的數據類型*/ class MyAccumulator extends AccumulatorV2[String, Map[String, Long]] {private var map = mutable.Map[String, Long]()/*** 當前累加器是否有初始化值* 如果為一個long的值,0就是初始化值,如果為list,Nil就是初始化值,是map,Map()就是初始化值*/override def isZero: Boolean = trueoverride def copy(): AccumulatorV2[String, Map[String, Long]] = {val accu = new MyAccumulatoraccu.map = this.mapaccu}override def reset(): Unit = map.clear()//分區內的數據累加 is: 5, of:4override def add(word: String): Unit = {if(map.contains(word)) {val newCount = map(word) + 1map.put(word, newCount)} else {map.put(word, 1)} // map.put(word, map.getOrElse(word, 0) + 1)}//多個分區間的數據累加override def merge(other: AccumulatorV2[String, Map[String, Long]]): Unit = {other.value.foreach{case (word, count) => {if(map.contains(word)) {val newCount = map(word) + countmap.put(word, newCount)} else {map.put(word, count)} // map.put(word, map.getOrElse(word, 0) + count)}}}override def value: Map[String, Long] = map.toMap }注冊使用:
object _08AccumulatorOps {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("$AccumulatorOps").setMaster("local[*]")val sc = new SparkContext(conf)val lines = sc.textFile("file:/E:/data.txt")val words = lines.flatMap(_.split("\\s+"))//注冊val myAccu = new MyAccumulator()sc.register(myAccu, "myAccu")//統計每個單詞出現的次數val pairs = words.map(word => {if(word == "is" || word == "of" || word == "a")myAccu.add(word)(word, 1)})val rbk = pairs.reduceByKey(_+_)rbk.foreach(println)println("=============累加器==========")myAccu.value.foreach(println)Thread.sleep(10000000)sc.stop()} }?
總結
以上是生活随笔為你收集整理的SparkCore基础的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: ClickHouse表引擎
- 下一篇: RSA公钥体系 与在 ssh中免密的登陆