Spark transformation算子案例
生活随笔
收集整理的這篇文章主要介紹了
Spark transformation算子案例
小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.
Spark支持兩種RDD操作:transformation和action?
在本文中,將對幾個(gè)常用的transformation算子進(jìn)行案例演示,采用Java和Scala兩種語言對代碼進(jìn)行編寫?
其中,在Java版本中,將對transformation算子進(jìn)行詳細(xì)介紹
transformation常用算子介紹
?
Java版本
@SuppressWarnings(value = {"unused"}) public class TransformationOperation {public static void main(String[] args) { // map(); // filter(); // flatMap(); // groupByKey(); // reduceByKey(); // sortByKey(); // join();cogroup();}/*** map算子案例:將集合中每一個(gè)元素都乘以2*/private static void map(){// 創(chuàng)建SparkConfSparkConf conf = new SparkConf().setAppName("map").setMaster("local");// 創(chuàng)建JavaSparkContextJavaSparkContext sc = new JavaSparkContext(conf);// 構(gòu)造集合List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);// 并行化集合,創(chuàng)建初始RDDJavaRDD<Integer> numberRDD = sc.parallelize(numbers);// 使用map算子,將集合中的每個(gè)元素都乘以2// map算子,是對任何類型的RDD,都可以調(diào)用的// 在Java中,map算子接收的參數(shù)是Function對象// 創(chuàng)建的Function對象,一定會讓你設(shè)置第二個(gè)泛型參數(shù),這個(gè)泛型類型,就是返回的新元素的類型// 同時(shí)call()方法的返回類型,也必須與第二個(gè)泛型類型同步// 在call()方法內(nèi)部,就可以對原始RDD中的每一個(gè)元素進(jìn)行各種處理和計(jì)算,并返回一個(gè)新的元素// 所有新的元素就會組成一個(gè)新的RDDJavaRDD<Integer> multipleNumberRDD = numberRDD.map(new Function<Integer, Integer>() {private static final long serialVersionUID = 1L;// 傳入call()方法的,就是1,2,3,4,5// 返回的就是2,4,6,8,10@Overridepublic Integer call(Integer v1) throws Exception {return v1 * 2;}});// 打印新的RDDmultipleNumberRDD.foreach(new VoidFunction<Integer>() {private static final long serialVersionUID = 1L;@Overridepublic void call(Integer t) throws Exception {System.out.println(t);}});// 關(guān)閉JavaSparkContextsc.close();}/*** filter算子案例:過濾集合中的偶數(shù)*/private static void filter(){// 創(chuàng)建SparkConfSparkConf conf = new SparkConf().setAppName("fliter").setMaster("local");// 創(chuàng)建JavaSparkContextJavaSparkContext sc = new JavaSparkContext(conf);// 模擬集合List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);// 并行化集合,創(chuàng)建初始RDDJavaRDD<Integer> numberRDD = sc.parallelize(numbers);// 對初始RDD執(zhí)行了filter算子,過濾出其中的偶數(shù)// filter算子,傳入的也是Function,其他的使用注意點(diǎn),實(shí)際上和map是一樣的// 但是,唯一的不同,就是call()方法的返回類型是Boolean// 每一個(gè)初始RDD中的元素,都會傳入call()方法,此時(shí)你可以執(zhí)行各種自定義計(jì)算邏輯// 來判斷這個(gè)元素是否是你想要的// 如果你想在新的RDD中保留這個(gè)元素,那么就返回true;否則,不想保留這個(gè)元素,返回falseJavaRDD<Integer> evenNumberRDD = numberRDD.filter(new Function<Integer, Boolean>() {private static final long serialVersionUID = 1L;// 在這里,1到10,都會傳入進(jìn)來// 但是根據(jù)我們的邏輯,只有2,4,6,8,10這幾個(gè)偶數(shù),會返回true// 所以,只有偶數(shù)會保留下來,放在新的RDD中@Overridepublic Boolean call(Integer v1) throws Exception {return v1 % 2 ==0;}});// 打印新的RDDevenNumberRDD.foreach(new VoidFunction<Integer>() {private static final long serialVersionUID = 1L;@Overridepublic void call(Integer t) throws Exception {System.out.println(t);}});// 關(guān)閉JavaSparkContextsc.close();}/*** flatMap案例:將文本行拆分為多個(gè)單詞* 注:flatMap與map的區(qū)別* map會針對每一條輸入進(jìn)行指定的操作,然后為每一條輸入返回一個(gè)對象* flatMap有兩個(gè)操作:操作1==>同map函數(shù)一樣,對每一條輸入進(jìn)行指定的操作,然后為每一條輸入返回一個(gè)對象* 操作2==>最后將所有對象合并未一個(gè)對象*/private static void flatMap(){// 創(chuàng)建SparkConfSparkConf conf = new SparkConf().setAppName("flatMap").setMaster("local");// 創(chuàng)建JavaSparkContextJavaSparkContext sc = new JavaSparkContext(conf);// 構(gòu)造集合List<String> lineList = Arrays.asList("hello you", "hello me", "hello world");// 并行化集合,創(chuàng)建RDDJavaRDD<String> lines = sc.parallelize(lineList);// 對RDD執(zhí)行flatMap算子,將每一行文本,拆分為多個(gè)單詞// flatMap算子,在Java中,接收的參數(shù)FlatMapFunction// 我們需要自己定義FlatMapFunction的第二個(gè)泛型類型,即,代表了返回的新元素的類型// call()方法,返回的類型,不是U,而是Iterable<U>,這里的U也與第二個(gè)泛型類型相同// flatMap其實(shí)就是,接收原始RDD中的每個(gè)元素,并進(jìn)行各種邏輯的計(jì)算和處理,返回可以返回// 多個(gè)元素,即封裝在Iterable集合中,可以使用ArrayList等集合// 新的RDD中,即封裝了所有的新元素;也就是說,新的RDD的大小一定是 >= 原始RDD的大小JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {private static final long serialVersionUID = 1L;@Overridepublic Iterable<String> call(String t) throws Exception {return Arrays.asList(t.split(" "));}});// 打印新的RDDwords.foreach(new VoidFunction<String>() {private static final long serialVersionUID = 1L;@Overridepublic void call(String t) throws Exception {System.out.println(t);}});// 關(guān)閉JavaSparkContextsc.close();}/*** groupByKey案例:按照班級對成績進(jìn)行分組*/private static void groupByKey(){// 創(chuàng)建SparkConfSparkConf conf = new SparkConf().setAppName("groupByKey").setMaster("local");// 創(chuàng)建JavaSparkContextJavaSparkContext sc = new JavaSparkContext(conf);// 模擬集合@SuppressWarnings("unchecked")List<Tuple2<String, Integer>> scoreList = Arrays.asList(new Tuple2<String, Integer>("class1", 80),new Tuple2<String, Integer>("class2", 75),new Tuple2<String, Integer>("class1", 90),new Tuple2<String, Integer>("class2", 65));// 并行化集合,創(chuàng)建JavaPairRDD// 注意:這里使用的是SparkContext的parallelizePairs()方法去創(chuàng)建JavaPairRDD// 與之前創(chuàng)建JavaRDD的方式有所不同JavaPairRDD<String, Integer> scores = sc.parallelizePairs(scoreList);// 針對scores RDD,執(zhí)行g(shù)roupByKey算子,對每個(gè)班級的成績進(jìn)行分組// groupByKey算子,返回的還是JavaPairRDD// 但是,JavaPairRDD的第一個(gè)泛型類型不變,第二個(gè)泛型類型變成Iterable這種集合類型// 也就是說,按照了key進(jìn)行分組,那么每個(gè)key可能都會有多個(gè)value,此時(shí)多個(gè)value聚合成了Iterable// 那么接下來,我們是不是就可以通過groupedScores這種JavaPairRDD,很方便地處理某個(gè)分組內(nèi)的數(shù)據(jù)JavaPairRDD<String, Iterable<Integer>> groupedScores = scores.groupByKey();// 打印groupedScores RDDgroupedScores.foreach(new VoidFunction<Tuple2<String,Iterable<Integer>>>() {private static final long serialVersionUID = 1L;@Overridepublic void call(Tuple2<String, Iterable<Integer>> t) throws Exception {System.out.println("class:" + t._1);Iterator<Integer> ite = t._2.iterator();while (ite.hasNext()) {System.out.println(ite.next());}System.out.println("====================");}});// 關(guān)閉JavaSparkContextsc.close();}/*** reduceByKey案例:統(tǒng)計(jì)每個(gè)班級的總分*/private static void reduceByKey(){// 創(chuàng)建SparkConfSparkConf conf = new SparkConf().setAppName("reduceByKey").setMaster("local");// 創(chuàng)建JavaSparkContextJavaSparkContext sc = new JavaSparkContext(conf);// 模擬集合@SuppressWarnings("unchecked")List<Tuple2<String, Integer>> scoreList = Arrays.asList(new Tuple2<String, Integer>("class1", 80),new Tuple2<String, Integer>("class2", 75),new Tuple2<String, Integer>("class1", 90),new Tuple2<String, Integer>("class2", 65));// 并行化集合,創(chuàng)建JavaPairRDDJavaPairRDD<String, Integer> scores = sc.parallelizePairs(scoreList);// 針對scores RDD,執(zhí)行reduceByKey算子// reduceByKey,接收的參數(shù)是Function2類型,它有三個(gè)泛型參數(shù),實(shí)際上代表了三個(gè)值// 第一個(gè)泛型類型和第二個(gè)泛型類型,代表了原始RDD中的元素的value的類型// 因此對每個(gè)key進(jìn)行reduce,都會依次將第一個(gè)、第二個(gè)value傳入,算出一個(gè)值之后,再傳入第三個(gè)value// 因此此處,會自動定義兩個(gè)泛型類型,代表call()方法的兩個(gè)傳入?yún)?shù)的類型// 第三個(gè)泛型類型,代表了每次reduce操作返回值的類型,默認(rèn)也是與原始RDD的value類型相同的// reduceByKey算法返回的RDD,還是JavaPairRDD<key, value>JavaPairRDD<String, Integer> totalScores = scores.reduceByKey(new Function2<Integer, Integer, Integer>() {private static final long serialVersionUID = 1L;// 對每個(gè)key,都會將其value,依次傳入call方法// 從而聚合出每個(gè)key對應(yīng)的一個(gè)value// 然后,將每個(gè)key對應(yīng)的一個(gè)value,組合成一個(gè)Tuple2。作為RDD的新元素@Overridepublic Integer call(Integer v1, Integer v2) throws Exception {return v1 + v2;}});// 打印totalScores RDDtotalScores.foreach(new VoidFunction<Tuple2<String,Integer>>() {private static final long serialVersionUID = 1L;@Overridepublic void call(Tuple2<String, Integer> t) throws Exception {System.out.println(t._1 + ":" + t._2);}});// 關(guān)閉JavaSparkContextsc.close();}/*** sortByKey操作:按照學(xué)生分?jǐn)?shù)進(jìn)行排序*/private static void sortByKey(){// 創(chuàng)建SparkConfSparkConf conf = new SparkConf().setAppName("sortByKey").setMaster("local");// 創(chuàng)建JavaSparkContextJavaSparkContext sc = new JavaSparkContext(conf);// 模擬集合@SuppressWarnings("unchecked")List<Tuple2<Integer, String>> scoreList = Arrays.asList(new Tuple2<Integer, String>(65, "leo"),new Tuple2<Integer, String>(50, "tom"),new Tuple2<Integer, String>(100, "marry"),new Tuple2<Integer, String>(80, "jack"));// 并行化集合,創(chuàng)建RDDJavaPairRDD<Integer, String> scores = sc.parallelizePairs(scoreList);// 對scores RDD執(zhí)行sortByKey算子// sortByKey其實(shí)就是根據(jù)key進(jìn)行排序,可以手動指定升序或者降序// false表示從大到小排列;true表示從小到大排列// 返回的,還是JavaPairRDD,其中的元素內(nèi)容,都是和原始的RDD一模一樣的// 但是就是RDD中的元素的順序,不同了JavaPairRDD<Integer, String> sortedScores = scores.sortByKey(false);// 打印sortedScored RDDsortedScores.foreach(new VoidFunction<Tuple2<Integer,String>>() {private static final long serialVersionUID = 1L;@Overridepublic void call(Tuple2<Integer, String> t) throws Exception {System.out.println(t._1 + ": " + t._2);}});// 關(guān)閉JavaSparkContextsc.close();}/*** join案例:打印學(xué)生成績*/private static void join(){// 創(chuàng)建SparkConfSparkConf conf = new SparkConf().setAppName("join").setMaster("local");// 創(chuàng)建JavaSparkContextJavaSparkContext sc = new JavaSparkContext(conf);// 模擬集合@SuppressWarnings("unchecked")List<Tuple2<Integer, String>> studentList = Arrays.asList(new Tuple2<Integer, String>(1, "leo"),new Tuple2<Integer, String>(2, "jack"),new Tuple2<Integer, String>(3, "tom"));@SuppressWarnings("unchecked")List<Tuple2<Integer, Integer>> scoreList = Arrays.asList(new Tuple2<Integer, Integer>(1, 100),new Tuple2<Integer, Integer>(2, 90),new Tuple2<Integer, Integer>(3, 60));// 并行化兩個(gè)RDDJavaPairRDD<Integer, String> students = sc.parallelizePairs(studentList);JavaPairRDD<Integer, Integer> scores = sc.parallelizePairs(scoreList);// 使用join算子關(guān)聯(lián)兩個(gè)RDD// join以后,還是會根據(jù)key進(jìn)行join,并返回JavaPairRDD// 但是JavaPairRDD的第一個(gè)泛型類型,是之前兩個(gè)JavaPairRDD都有的key類型,因?yàn)槭窃撍阕邮峭ㄟ^key進(jìn)行join的// 第二個(gè)泛型類型,是Tuple2<v1, v2>的類型,Tuple2的兩個(gè)泛型分別為兩個(gè)原始RDD的value的類型// join,就返回RDD的每一個(gè)元素,就是通過key join上的一個(gè)pair// 比如有(1,1) (1,2) (1,3)的一個(gè)RDD// 還有一個(gè)(1,4) (2,1) (2,2)的一個(gè)RDD// join以后,實(shí)際上會得到(1 (1,4)) (1 (2,4)) (1 (3,4))// 不會產(chǎn)生(2 (x,x))這樣形式的,因?yàn)閮蓚€(gè)RDD中,通過join算子,只會join相同的key的RDD;// 而這兩個(gè)RDD中,只有1這個(gè)key值是上下都有的,因此join之后,產(chǎn)生了上述的結(jié)果JavaPairRDD<Integer, Tuple2<String, Integer>> studentScores = students.join(scores);// 打印studentScores RDDstudentScores.foreach(new VoidFunction<Tuple2<Integer,Tuple2<String,Integer>>>() {private static final long serialVersionUID = 1L;@Overridepublic void call(Tuple2<Integer, Tuple2<String, Integer>> t) throws Exception {System.out.println("student id: " + t._1);System.out.println("student name: " + t._2._1);System.out.println("student score: " + t._2._2);System.out.println("=========================");}});// 關(guān)閉JavaSparkContsc.close();}/*** cogroup案例:打印學(xué)生成績*/private static void cogroup(){// 創(chuàng)建SparkConfSparkConf conf = new SparkConf().setAppName("cogroup").setMaster("local");// 創(chuàng)建JavaSparkContextJavaSparkContext sc = new JavaSparkContext(conf);// 模擬集合@SuppressWarnings("unchecked")List<Tuple2<Integer, String>> studentList = Arrays.asList(new Tuple2<Integer, String>(1, "leo"),new Tuple2<Integer, String>(2, "jack"),new Tuple2<Integer, String>(3, "tom"));@SuppressWarnings("unchecked")List<Tuple2<Integer, Integer>> scoreList = Arrays.asList(new Tuple2<Integer, Integer>(1, 100),new Tuple2<Integer, Integer>(2, 90),new Tuple2<Integer, Integer>(3, 60),new Tuple2<Integer, Integer>(1, 70),new Tuple2<Integer, Integer>(2, 80),new Tuple2<Integer, Integer>(1, 70),new Tuple2<Integer, Integer>(3, 50));// 并行化兩個(gè)RDDJavaPairRDD<Integer, String> students = sc.parallelizePairs(studentList);JavaPairRDD<Integer, Integer> scores = sc.parallelizePairs(scoreList);// cogroup與join不用// 相當(dāng)于是,一個(gè)key join上的所有value,都給放到一個(gè)Itreable里面去了// 而在join算子中,則不會Iterable里面,會一組一組的打印在控制臺上JavaPairRDD<Integer, Tuple2<Iterable<String>, Iterable<Integer>>> studentScores =students.cogroup(scores);// 打印studentScores RDDstudentScores.foreach(new VoidFunction<Tuple2<Integer,Tuple2<Iterable<String>,Iterable<Integer>>>>() {private static final long serialVersionUID = 1L;@Overridepublic void call(Tuple2<Integer, Tuple2<Iterable<String>, Iterable<Integer>>> t) throws Exception {System.out.println("student id: " + t._1);System.out.println("student name: " + t._2._1);System.out.println("student score: " + t._2._2);System.out.println("=========================");}});// 關(guān)閉JavaSparkContextsc.close();} }Scala版本
object TransformationOperation {def main(args: Array[String]): Unit = { // map() // filter() // flatMap() // groupByKey() // reduceByKey() // sortByKey() // join()cogroup()}def map(): Unit = {val conf = new SparkConf().setAppName("map").setMaster("local")val sc = new SparkContext(conf)val numbers = Array(1, 2, 3, 4, 5)val numberRDD = sc.parallelize(numbers, 1)val multipleNumberRDD = numberRDD.map { num => num*2 }multipleNumberRDD.foreach { num => println(num) }}def filter(): Unit = {val conf = new SparkConf().setAppName("map").setMaster("local")val sc = new SparkContext(conf)val numbers = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)val numbersRDD = sc.parallelize(numbers, 1)val evenNumberRDD = numbersRDD.filter { num => num % 2 == 0 }evenNumberRDD.foreach { num => println(num) }}def flatMap(): Unit = {val conf = new SparkConf().setAppName("flatMap").setMaster("local")val sc = new SparkContext(conf)val lineArray = Array("hello you", "hello me", "hello world")val lines = sc.parallelize(lineArray, 1)val words = lines.flatMap { line => line.split(" ") }words.foreach { word => println(word) }}def groupByKey(): Unit = {val conf = new SparkConf().setAppName("groupByKey").setMaster("local")val sc = new SparkContext(conf)val scoreList = Array(Tuple2("class1",80),Tuple2("class2",75),Tuple2("class1",90),Tuple2("class2",60))val scores = sc.parallelize(scoreList, 1)val groupedScores = scores.groupByKey()groupedScores.foreach(score => {println(score._1)score._2.foreach { singleScore => println(singleScore)}println("===================")})}def reduceByKey(): Unit = {val conf = new SparkConf().setAppName("reduceByKey").setMaster("local")val sc = new SparkContext(conf)val scoreList = Array(Tuple2("class1",80),Tuple2("class2",75),Tuple2("class1",90),Tuple2("class2",60))val scores = sc.parallelize(scoreList, 1)val totalScores = scores.reduceByKey(_ + _)totalScores.foreach(classScore => println(classScore._1 + ":" + classScore._2))}def sortByKey(): Unit = {val conf = new SparkConf().setAppName("sortByKey").setMaster("local")val sc = new SparkContext(conf)val scoreList = Array(Tuple2(65,"leo"),Tuple2(50,"tom"),Tuple2(100,"marry"),Tuple2(85,"jack"))val scores = sc.parallelize(scoreList, 1)val sortedScores = scores.sortByKey(false)sortedScores.foreach(studentScore => println(studentScore._1 + ": " + studentScore._2))}def join(): Unit = {val conf = new SparkConf().setAppName("join").setMaster("local")val sc = new SparkContext(conf)val studentList = Array(Tuple2(1,"leo"),Tuple2(2,"jack"),Tuple2(3,"tom"))val scoreList = Array(Tuple2(1,100),Tuple2(2,90),Tuple2(3,60))val students = sc.parallelize(studentList, 1)val scores = sc.parallelize(scoreList, 1)val studentScores = students.join(scores)studentScores.foreach(studentScore => {println("student id:" + studentScore._1)println("student name:" + studentScore._2._1)println("student score:" + studentScore._2._2)println("=================================")})}def cogroup(): Unit = {val conf = new SparkConf().setAppName("cogroup").setMaster("local")val sc = new SparkContext(conf)val studentList = Array(Tuple2(1,"leo"),Tuple2(2,"jack"),Tuple2(3,"tom"))val scoreList = Array(Tuple2(1,100),Tuple2(2,90),Tuple2(3,60),Tuple2(1,70),Tuple2(2,80),Tuple2(1,70),Tuple2(3,50))val students = sc.parallelize(studentList, 1)val scores = sc.parallelize(scoreList, 1)val studentScores = students.cogroup(scores)studentScores.foreach(studentScore => {println("student id:" + studentScore._1)println("student name:" + studentScore._2._1)println("student score:" + studentScore._2._2)println("=================================")})}} 創(chuàng)作挑戰(zhàn)賽新人創(chuàng)作獎勵來咯,堅(jiān)持創(chuàng)作打卡瓜分現(xiàn)金大獎總結(jié)
以上是生活随笔為你收集整理的Spark transformation算子案例的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Spark action算子案例
- 下一篇: HBase读写流程