Spark _03RDD_Transformations_Action_使用scalajavaAPI
生活随笔
收集整理的這篇文章主要介紹了
Spark _03RDD_Transformations_Action_使用scalajavaAPI
小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.
Scala API? Transformations轉(zhuǎn)換算子&Action行動(dòng)算子
【友情提示】代碼頁(yè),請(qǐng)從下往上看。
package ddd.henu.transformationsimport org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext}/*** 延遲執(zhí)行算子* words.txt文檔:* hello world* hello henu* hello george* hello honey* a li* ba ba*/ object TransformationsDemo {def main(args: Array[String]): Unit = {val conf = new SparkConf()conf.setMaster("local")conf.setAppName("test")val sc = new SparkContext(conf)//減少日志輸出sc.setLogLevel("Error")val lines = sc.textFile("./data/words.txt")//take取前多少條val result: Array[String] = lines.take(5)result.foreach(println)//first 第一條數(shù)據(jù) first底層take(1)/*val first = lines.first()println(first)*///collect 回收結(jié)果,內(nèi)容放在JVM內(nèi)存中,如果數(shù)據(jù)量較大,就不要調(diào)用了,會(huì)報(bào)出OOM錯(cuò)誤/*val result: Array[String] = lines.collect()result.foreach(println)*///count 總行數(shù)/*val l: Long = lines.count()println(l)*///sample 抽樣 。【注】這里我將words.txt文件里的數(shù)據(jù)進(jìn)行復(fù)制,100條/*第一個(gè)boolean參true表示抽到的數(shù)據(jù)會(huì)在放進(jìn)源數(shù)據(jù)中。反之第二個(gè)參數(shù),抽樣的比例 值100中的100*0.1條左右!!!第三個(gè)參數(shù),指定了每次抽樣不變《seed 種子》*/ // val result: RDD[String] = lines.sample(true,0.1) // val result: RDD[String] = lines.sample(true,0.1,100L) // result.foreach(println)/*val words = lines.flatMap(one => {one.split(" ")})val pairWords = words.map(one =>{(one,1)})val reduceResult = pairWords.reduceByKey((v1:Int,v2:Int) => (v1+v2))//使用 sortByKey進(jìn)行排序(多此一舉)兩次反轉(zhuǎn)val transRDD: RDD[(Int, String)] = reduceResult.map(tp =>{tp.swap})val result: RDD[(Int, String)] = transRDD.sortByKey(false)//再轉(zhuǎn)回來(lái),否則數(shù)據(jù)變?yōu)?Int,String 類型result.map(_.swap).foreach(println)*///進(jìn)行出現(xiàn)次數(shù)的排序 sortBy//升序 // reduceResult.sortBy(tp => (tp._2)).foreach(println)//降序 // reduceResult.sortBy(tp => (tp._2),false).foreach(println)/*** (a,1)* (george,1)* (li,1)* (honey,1)* (henu,1)* (world,1)* (ba,2)* (hello,4)*///filter/*val rdd1 = lines.flatMap(one => {one.split(" ")})rdd1.filter(one => {"hello".equals(one)}).foreach(println)*//*** hello* hello* hello* hello*///flatmap 一對(duì)多 // lines.flatMap(one => {one.split(" ")}).foreach(println)/*** hello* world* hello* henu* hello* george* hello* honey* a* li* ba* ba*///map 一對(duì)一/*lines.map(one =>{one + "#"}).foreach(println)*//*hello world#hello henu#hello george#hello honey#a li#ba ba#*/} }Java API? Transformations轉(zhuǎn)換算子&Action行動(dòng)算子
package eee;import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.*; import scala.Tuple2;import java.util.Arrays; import java.util.Iterator; import java.util.List;/*** @author George* @description**/ public class TransformationsDemo {public static void main(String[] args) {SparkConf conf = new SparkConf();conf.setMaster("local");conf.setAppName("test");JavaSparkContext sc = new JavaSparkContext(conf);sc.setLogLevel("Error");JavaRDD<String> lines = sc.textFile("./data/words.txt");//sample 抽樣 當(dāng)抽樣的時(shí)候,給words中的數(shù)據(jù)進(jìn)行復(fù)制添加多點(diǎn)數(shù)據(jù),如100條//添加種子后,抽樣結(jié)果不在變化JavaRDD<String> sample = lines.sample(true, 0.1);sample.foreach(new VoidFunction<String>() {@Overridepublic void call(String s) throws Exception {System.out.println(s);}});//take/*List<String> take = lines.take(3);for (String s : take) {System.out.println(s);}*///first/*String first = lines.first();System.out.println(first);*///count/*long count = lines.count();System.out.println(count);*///collect/*List<String> collect = lines.collect();for (String s : collect) {System.out.println(s);}*///reduceByKey + 排序 沒(méi)有sortBy 只有sortByKey/*lines.flatMap(new FlatMapFunction<String, String>() {@Overridepublic Iterator<String> call(String s) throws Exception {return Arrays.asList(s.split(" ")).iterator();}}).mapToPair(new PairFunction<String, String, Integer>() {@Overridepublic Tuple2<String, Integer> call(String word) throws Exception {return new Tuple2<String, Integer>(word,1);}}).reduceByKey(new Function2<Integer, Integer, Integer>() {@Overridepublic Integer call(Integer v1, Integer v2) throws Exception {return v1+v2;}}).mapToPair(new PairFunction<Tuple2<String, Integer>, Integer, String>() {@Overridepublic Tuple2<Integer, String> call(Tuple2<String, Integer> tp) throws Exception {return tp.swap();}}).sortByKey(false).foreach(new VoidFunction<Tuple2<Integer, String>>() {@Overridepublic void call(Tuple2<Integer, String> integerStringTuple2) throws Exception {System.out.println(integerStringTuple2);}});*///flatmap/*JavaRDD<String> result = lines.flatMap(new FlatMapFunction<String, String>() {@Overridepublic Iterator<String> call(String s) throws Exception {return Arrays.asList(s.split(" ")).iterator();}});result.foreach(new VoidFunction<String>() {@Overridepublic void call(String s) throws Exception {System.out.println(s);}});*///mapToPair 一對(duì)一 返回值為kv格式的使用mapToPair 返回String的用map/*JavaPairRDD<String, String> mapToPair = lines.mapToPair(new PairFunction<String, String, String>() {@Overridepublic Tuple2<String, String> call(String s) throws Exception {return new Tuple2<>(s, s + "#");}});*//*JavaRDD<String> map = lines.map(new Function<String, String>() {@Overridepublic String call(String line) throws Exception {return line + "*";}});map.foreach(new VoidFunction<String>() {@Overridepublic void call(String s) throws Exception {System.out.println(s);}});*//*JavaRDD<String> result = lines.filter(new Function<String, Boolean>() {@Overridepublic Boolean call(String line) throws Exception {return "hello george".equals(line);}});result.foreach(new VoidFunction<String>() {@Overridepublic void call(String s) throws Exception {System.out.println(s);}});System.out.println(result.count());*/sc.stop();} }?
總結(jié)
以上是生活随笔為你收集整理的Spark _03RDD_Transformations_Action_使用scalajavaAPI的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: Spark _02SparkCore_R
- 下一篇: System.currentTimeMi