Spark-自定义累加器-进行字符串拼接(代码及详细实现步骤)
生活随笔
收集整理的這篇文章主要介紹了
Spark-自定义累加器-进行字符串拼接(代码及详细实现步骤)
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
看longAccumulator()方法源碼里是val acc = new LongAccumulator然后用register(acc)在Spark中注冊了累加器,進入ctrl+鼠標左鍵進入LongAccumulator,可以看到繼承了AccumulatorV2[jl.Long, jl.Long],根據LongAccumulator來實現自定義累加器
實現類
//1.繼承父類AccumulatorV2[IN,OUT](IN,OUT是Driver發到Executor的類型與Executor返回給Driver的類型) //2.實現抽象方法 //3.創建累加器 class WordAccumulator extends AccumulatorV2[String,util.ArrayList[String]] {val list = new util.ArrayList[String]()//當前的累加器是不是初始化狀態(這里是判斷創建的集合是不是空)override def isZero: Boolean = {list.isEmpty}//復制累加器對象override def copy(): AccumulatorV2[String, util.ArrayList[String]] = {new WordAccumulator}//重置累加器對象(這里把集合清空即可)override def reset(): Unit = {list.clear()}//向累加器中增加數據override def add(v: String): Unit = {if (v.contains("h")){list.add(v)}}//合并累加器(不同executor返回會有個合并的過程)override def merge(other: AccumulatorV2[String, util.ArrayList[String]]): Unit = {list.addAll(other.value)}//獲取累加器的結果override def value: util.ArrayList[String] = list }然后是main函數
def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("CheckPoint").setMaster("local")//創建上下文對象val sc = new SparkContext(conf)val dataRDD:RDD[String] = sc.makeRDD(List("chun1","chun2","chun3","chun4"),2)// TODO 創建累加器val wordAccumulator = new WordAccumulator()// TODO 注冊累加器sc.register(wordAccumulator)dataRDD.foreach{case word=>{//TODO 執行累加器的累加功能wordAccumulator.add(word)}}// TODO 獲取累加結果println(wordAccumulator.value)} 結果:[chun1, chun2, chun3, chun4]完整代碼
package date_9_23import java.utilimport org.apache.spark.rdd.RDD import org.apache.spark.util.AccumulatorV2 import org.apache.spark.{SparkConf, SparkContext}/*** 自定義累加器*/ object Spark4_LongAccumulator {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("CheckPoint").setMaster("local")//創建上下文對象val sc = new SparkContext(conf)val dataRDD:RDD[String] = sc.makeRDD(List("chun1","chun2","chun3","chun4"),2)// TODO 創建累加器val wordAccumulator = new WordAccumulator()// TODO 注冊累加器sc.register(wordAccumulator)dataRDD.foreach{case word=>{//TODO 執行累加器的累加功能wordAccumulator.add(word)}}// TODO 獲取累加結果println(wordAccumulator.value)} }//聲明累加器 //1.繼承父類AccumulatorV2[IN,OUT](IN,OUT是Driver發到Executor的類型與Executor返回給Driver的類型) //2.實現抽象方法 //3.創建累加器 class WordAccumulator extends AccumulatorV2[String,util.ArrayList[String]] {val list = new util.ArrayList[String]()//當前的累加器是不是初始化狀態(這里是判斷創建的集合是不是空)override def isZero: Boolean = {list.isEmpty}//復制累加器對象override def copy(): AccumulatorV2[String, util.ArrayList[String]] = {new WordAccumulator}//重置累加器對象(這里把集合清空即可)override def reset(): Unit = {list.clear()}//向累加器中增加數據override def add(v: String): Unit = {if (v.contains("h")){list.add(v)}}//合并累加器(不同executor返回會有個合并的過程)override def merge(other: AccumulatorV2[String, util.ArrayList[String]]): Unit = {list.addAll(other.value)}//獲取累加器的結果override def value: util.ArrayList[String] = list } 創作挑戰賽新人創作獎勵來咯,堅持創作打卡瓜分現金大獎總結
以上是生活随笔為你收集整理的Spark-自定义累加器-进行字符串拼接(代码及详细实现步骤)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: AMD、Intel核战之外还要飚速:首款
- 下一篇: 世界最强的纸飞机能飞多远?