Spark Java API:Transformation
mapPartitions
官方文檔描述:
Return a new RDD by applying a function to each partition of this RDD.mapPartitions函數會對每個分區依次調用分區函數處理,然后將處理的結果(若干個Iterator)生成新的RDDs。?
mapPartitions與map類似,但是如果在映射的過程中需要頻繁創建額外的對象,使用mapPartitions要比map高效的過。比如,將RDD中的所有數據通過JDBC連接寫入數據庫,如果使用map函數,可能要為每一個元素都創建一個connection,這樣開銷很大,如果使用mapPartitions,那么只需要針對每一個分區建立一個connection。
函數原型:
def mapPartitions[U](f: FlatMapFunction[java.util.Iterator[T], U]): JavaRDD[U] def mapPartitions[U](f: FlatMapFunction[java.util.Iterator[T], U],preservesPartitioning: Boolean): JavaRDD[U]第一個函數是基于第二個函數實現的,使用的是preservesPartitioning為false。而第二個函數我們可以指定preservesPartitioning,preservesPartitioning表示是否保留父RDD的partitioner分區信息;FlatMapFunction中的Iterator是這個rdd的一個分區的所有element組成的Iterator。
實例
List<Integer> data = Arrays.asList(1, 2, 4, 3, 5, 6, 7); //RDD有兩個分區 JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data,2); //計算每個分區的合計 JavaRDD<Integer> mapPartitionsRDD = javaRDD.mapPartitions(new FlatMapFunction<Iterator<Integer>, Integer>() { @Overridepublic Iterable<Integer> call(Iterator<Integer> integerIterator) throws Exception {int isum = 0;while(integerIterator.hasNext())isum += integerIterator.next();LinkedList<Integer> linkedList = new LinkedList<Integer>();linkedList.add(isum);return linkedList; } });System.out.println("mapPartitionsRDD~~~~~~~~~~~~~~~~~~~~~~" + mapPartitionsRDD.collect());mapPartitionsWithIndex
官方文檔說明:
Return a new RDD by applying a function to each partition of this RDD, while tracking the index of the original partition.mapPartitionsWithIndex與mapPartition基本相同,只是在處理函數的參數是一個二元元組,元組的第一個元素是當前處理的分區的index,元組的第二個元素是當前處理的分區元素組成的Iterator。
函數原型:
def mapPartitionsWithIndex[R]( f: JFunction2[jl.Integer, java.util.Iterator[T], java.util.Iterator[R]],preservesPartitioning: Boolean = false): JavaRDD[R]源碼分析:
def mapPartitions[U: ClassTag]( f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): RDD[U] = withScope { val cleanedF = sc.clean(f) new MapPartitionsRDD( this, (context: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(iter), preservesPartitioning)} def mapPartitionsWithIndex[U: ClassTag]( f: (Int, Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false): RDD[U] = withScope { val cleanedF = sc.clean(f) new MapPartitionsRDD( this, (context: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(index, iter), preservesPartitioning)}從源碼中可以看到其實mapPartition已經獲得了當前處理的分區的index,只是沒有傳入分區處理函數,而mapPartition將其傳入分區處理函數。
實例:
List<Integer> data = Arrays.asList(1, 2, 4, 3, 5, 6, 7); //RDD有兩個分區 JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data,2); //分區index、元素值、元素編號輸出 JavaRDD<String> mapPartitionsWithIndexRDD = javaRDD.mapPartitionsWithIndex(new Function2<Integer, Iterator<Integer>, Iterator<String>>() {@Override public Iterator<String> call(Integer v1, Iterator<Integer> v2) throws Exception { LinkedList<String> linkedList = new LinkedList<String>(); int i = 0; while (v2.hasNext()) linkedList.add(Integer.toString(v1) + "|" + v2.next().toString() + Integer.toString(i++)); return linkedList.iterator(); } },false);System.out.println("mapPartitionsWithIndexRDD~~~~~~~~~~~~~~~~~~~~~~" + mapPartitionsWithIndexRDD.collect(?
?
sample
官方文檔描述:
Return a sampled subset of this RDD.返回抽樣的樣本的子集。
函數原型:
- withReplacement can elements be sampled multiple times (replaced when sampled out)
- fraction expected size of the sample as a fraction of this RDD’s size
- without replacement: probability that each element is chosen; fraction must be [0, 1]
- with replacement: expected number of times each element is chosen; fraction must be >= 0
- withReplacement can elements be sampled multiple times (replaced when sampled out)
- fraction expected size of the sample as a fraction of this RDD’s size
- without replacement: probability that each element is chosen; fraction must be [0, 1]
- with replacement: expected number of times each element is chosen; fraction must be >= 0
- seed seed for the random number generator
第一函數是基于第二個實現的,在第一個函數中seed為Utils.random.nextLong;其中,withReplacement是建立不同的采樣器;fraction為采樣比例;seed為隨機生成器的種子。
源碼分析:
def sample(withReplacement: Boolean, fraction: Double, seed: Long = Utils.random.nextLong): RDD[T] = withScope { require(fraction >= 0.0, "Negative fraction value: " + fraction) if (withReplacement) { new PartitionwiseSampledRDD[T, T](this, new PoissonSampler[T](fraction), true, seed) } else { new PartitionwiseSampledRDD[T, T](this, new BernoulliSampler[T](fraction), true, seed) } }sample函數中,首先對fraction進行驗證;再次建立PartitionwiseSampledRDD,依據withReplacement的值分別建立柏松采樣器或伯努利采樣器。
實例:
List<Integer> data = Arrays.asList(1, 2, 4, 3, 5, 6, 7); JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data); //false 是伯努利分布 (元素可以多次采樣);0.2 采樣比例;100 隨機數生成器的種子 JavaRDD<Integer> sampleRDD = javaRDD.sample(false,0.2,100); System.out.println("sampleRDD~~~~~~~~~~~~~~~~~~~~~~~~~~" + sampleRDD.collect()); //true 是柏松分布;0.2 采樣比例;100 隨機數生成器的種子 JavaRDD<Integer> sampleRDD1 = javaRDD.sample(false,0.2,100); System.out.println("sampleRDD1~~~~~~~~~~~~~~~~~~~~~~~~~~" + sampleRDD1.collect());randomSplit
官方文檔描述:
Randomly splits this RDD with the provided weights.依據所提供的權重對該RDD進行隨機劃分
函數原型:
- weights for splits, will be normalized if they don’t sum to 1
- random seed
- return split RDDs in an array
- weights for splits, will be normalized if they don’t sum to 1
- return split RDDs in an array
源碼分析:
def randomSplit(weights: Array[Double], seed: Long = Utils.random.nextLong): Array[RDD[T]] = withScope { val sum = weights.sum val normalizedCumWeights = weights.map(_ / sum).scanLeft(0.0d)(_ + _) normalizedCumWeights.sliding(2).map { x =>randomSampleWithRange(x(0), x(1), seed) }.toArray }def randomSampleWithRange(lb: Double, ub: Double, seed: Long): RDD[T] = { this.mapPartitionsWithIndex( { (index, partition) => val sampler = new BernoulliCellSampler[T](lb, ub) sampler.setSeed(seed + index) sampler.sample(partition) }, preservesPartitioning = true) }從源碼中可以看到randomSPlit先是對權重數組進行0-1正則化;再利用randomSampleWithRange函數,對RDD進行劃分;而在該函數中調用mapPartitionsWithIndex(上一節有具體說明),建立伯努利采樣器對RDD進行劃分。
實例:
List<Integer> data = Arrays.asList(1, 2, 4, 3, 5, 6, 7); JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data); double [] weights = {0.1,0.2,0.7}; //依據所提供的權重對該RDD進行隨機劃分 JavaRDD<Integer> [] randomSplitRDDs = javaRDD.randomSplit(weights); System.out.println("randomSplitRDDs of size~~~~~~~~~~~~~~" + randomSplitRDDs.length); int i = 0; for(JavaRDD<Integer> item:randomSplitRDDs) System.out.println(i++ + " randomSplitRDDs of item~~~~~~~~~~~~~~~~" + item.collect());?
?
union
官方文檔描述:
Return the union of this RDD and another one. Any identical elements will appear multiple times(use`.distinct()` to eliminate them).函數原型:
def union(other: JavaRDD[T]): JavaRDD[T]union() 將兩個 RDD 簡單合并在一起,不改變 partition 里面的數據。RangeDependency 實際上也是 1:1,只是為了訪問 union() 后的 RDD 中的 partition 方便,保留了原始 RDD 的 range 邊界。
實例:
List<Integer> data = Arrays.asList(1,2,4,3,5,6,7); JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data); JavaRDD<Integer> unionRDD = javaRDD.union(javaRDD); System.out.println("unionRDD~~~~~~~~~~~~~~~~~~~~~~" + unionRDD.collect());intersection
官方文檔描述:
Return the intersection of this RDD and another one.The output will not contain any duplicate elements, even if the input RDDs did.Note that this method performs a shuffle internally.函數原型:
def intersection(other: JavaRDD[T]): JavaRDD[T]源碼分析:
def intersection(other: RDD[T]): RDD[T] = withScope { this.map(v => (v, null)).cogroup(other.map(v => (v, null))) .filter { case (_, (leftGroup, rightGroup)) => leftGroup.nonEmpty && rightGroup.nonEmpty } .keys }先使用 map() 將 RDD[T] 轉變成 RDD[(T, null)],這里的 T 只要不是 Array 等集合類型即可。接著,進行 a.cogroup(b)(后面會詳細介紹cogroup)。之后再使用 filter() 過濾掉 [iter(groupA()), iter(groupB())] 中 groupA 或 groupB 為空的 records,得到 FilteredRDD。最后,使用 keys() 只保留 key 即可,得到 MappedRDD。
實例:
List<Integer> data = Arrays.asList(1, 2, 4, 3, 5, 6, 7); JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data); JavaRDD<Integer> intersectionRDD = javaRDD.intersection(javaRDD); System.out.println(intersectionRDD.collect());?
?
coalesce
官方文檔描述:
Return a new RDD that is reduced into `numPartitions` partitions.函數原型:
def coalesce(numPartitions: Int): JavaRDD[T]def coalesce(numPartitions: Int, shuffle: Boolean): JavaRDD[T]源碼分析:
def coalesce(numPartitions: Int, shuffle: Boolean = false)(implicit ord: Ordering[T] = null) : RDD[T] = withScope { if (shuffle) { /** Distributes elements evenly across output partitions, starting from a random partition. */ val distributePartition = (index: Int, items: Iterator[T]) => { var position = (new Random(index)).nextInt(numPartitions) items.map { t => // Note that the hash code of the key will just be the key itself. The HashPartitioner // will mod it with the number of total partitions. position = position + 1 (position, t) } } : Iterator[(Int, T)] // include a shuffle step so that our upstream tasks are still distributed new CoalescedRDD(new ShuffledRDD[Int, T, T](mapPartitionsWithIndex(distributePartition), new HashPartitioner(numPartitions)), numPartitions).values } else { new CoalescedRDD(this, numPartitions) } }從源碼中可以看出,當shuffle=false時,由于不進行shuffle,問題就變成parent RDD中哪些partition可以合并在一起,合并的過程依據設置的numPartitons中的元素個數進行合并處理。當shuffle=true時,進行shuffle操作,原理很簡單,先是對partition中record進行k-v轉換,其中key是由?(new Random(index)).nextInt(numPartitions)+1計算得到,value為record,index 是該 partition 的索引,numPartitions 是 CoalescedRDD 中的 partition 個數,然后 shuffle 后得到 ShuffledRDD, 可以得到均分的 records,再經過復雜算法來建立 ShuffledRDD 和 CoalescedRDD 之間的數據聯系,最后過濾掉 key,得到 coalesce 后的結果 MappedRDD。
實例:
List<Integer> data = Arrays.asList(1, 2, 4, 3, 5, 6, 7); JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data); // shuffle默認是false JavaRDD<Integer> coalesceRDD = javaRDD.coalesce(2); System.out.println(coalesceRDD);JavaRDD<Integer> coalesceRDD1 = javaRDD.coalesce(2,true); System.out.println(coalesceRDD1);注意:
coalesce() 可以將 parent RDD 的 partition 個數進行調整,比如從 5 個減少到 3 個,或者從 5 個增加到 10 個。需要注意的是當 shuffle = false 的時候,是不能增加 partition 個數的(即不能從 5 個變為 10 個)。
repartition
官網文檔描述:
Return a new RDD that has exactly numPartitions partitions. Can increase or decrease the level of parallelism in this RDD. Internally, this uses a shuffle to redistribute data. If you are decreasing the number of partitions in this RDD, consider using `coalesce`,which can avoid performing a shuffle.特別需要說明的是,如果使用repartition對RDD的partition數目進行縮減操作,可以使用coalesce函數,將shuffle設置為false,避免shuffle過程,提高效率。
函數原型:
def repartition(numPartitions: Int): JavaRDD[T]源碼分析:
def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope { coalesce(numPartitions, shuffle = true) }從源碼中可以看到repartition等價于 coalesce(numPartitions, shuffle = true)
實例:
List<Integer> data = Arrays.asList(1, 2, 4, 3, 5, 6, 7); JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data); //等價于 coalesce(numPartitions, shuffle = true) JavaRDD<Integer> repartitionRDD = javaRDD.repartition(2); System.out.println(repartitionRDD);?
?
cartesian
官方文檔描述:
Return the Cartesian product of this RDD and another one, that is, the RDD of all pairs of elements (a, b) where a is in `this` and b is in `other`.函數原型:
def cartesian[U](other: JavaRDDLike[U, _]): JavaPairRDD[T, U]源碼分析:
def getPartitions: Array[Partition] = { // create the cross product split val array = new Array[Partition(rdd1.partitions.length * rdd2.partitions.length) for (s1 <- rdd1.partitions; s2 <- rdd2.partitions) { val idx = s1.index * numPartitionsInRdd2 + s2.index array(idx) = new CartesianPartition(idx, rdd1, rdd2, s1.index, s2.index) } array }def getDependencies: Seq[Dependency[_]] = List( new NarrowDependency(rdd1) { def getParents(id: Int): Seq[Int] = List(id / numPartitionsInRdd2) }, new NarrowDependency(rdd2) { def getParents(id: Int): Seq[Int] = List(id % numPartitionsInRdd2) } )Cartesian 對兩個 RDD 做笛卡爾集,生成的 CartesianRDD 中 partition 個數 =?partitionNum(RDD a) * partitionNum(RDD b)。從getDependencies分析可知,這里的依賴關系與前面的不太一樣,CartesianRDD中每個partition依賴兩個parent RDD,而且其中每個 partition 完全依賴(NarrowDependency) RDD a 中一個 partition,同時又完全依賴(NarrowDependency) RDD b 中另一個 partition。具體如下CartesianRDD 中的 partiton i 依賴于(RDD a).List(i / numPartitionsInRDDb)?和?(RDD b).List(i %numPartitionsInRDDb)。
實例:
List<Integer> data = Arrays.asList(1, 2, 4, 3, 5, 6, 7); JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data);JavaPairRDD<Integer,Integer> cartesianRDD = javaRDD.cartesian(javaRDD); System.out.println(cartesianRDD.collect());distinct
官方文檔描述:
Return a new RDD containing the distinct elements in this RDD.函數原型:
def distinct(): JavaRDD[T]def distinct(numPartitions: Int): JavaRDD[T]第一個函數是基于第二函數實現的,只是numPartitions默認為partitions.length,partitions為parent RDD的分區。
源碼分析:
def distinct(): RDD[T] = withScope { distinct(partitions.length)}def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope { map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1) }distinct() 功能是 deduplicate RDD 中的所有的重復數據。由于重復數據可能分散在不同的 partition 里面,因此需要 shuffle 來進行 aggregate 后再去重。然而,shuffle 要求數據類型是
實例:
List<Integer> data = Arrays.asList(1, 2, 4, 3, 5, 6, 7, 1, 2); JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data);JavaRDD<Integer> distinctRDD1 = javaRDD.distinct(); System.out.println(distinctRDD1.collect()); JavaRDD<Integer> distinctRDD2 = javaRDD.distinct(2); System.out.println(distinctRDD2.collect());?
?
aggregate
官方文檔描述:
Aggregate the elements of each partition, and then the results for all the partitions,using given combine functions and a neutral "zero value". This function can return a different result type, U, than the type of this RDD, T. Thus, we need one operation for merging a T into an U and one operation for merging two U's, as in scala.TraversableOnce.Both of these functions are allowed to modify and return their first argument instead of creating a new U to avoid memory allocation.函數原型:
def aggregate[U](zeroValue: U)(seqOp: JFunction2[U, T, U], combOp: JFunction2[U, U, U]): U源碼分析:
def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U = withScope { // Clone the zero value since we will also be serializing it as part of tasks var jobResult = Utils.clone(zeroValue, sc.env.serializer.newInstance())val cleanSeqOp = sc.clean(seqOp) val cleanCombOp = sc.clean(combOp) val aggregatePartition = (it: Iterator[T]) => it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp) val mergeResult = (index: Int, taskResult: U) => jobResult = combOp(jobResult, taskResult) sc.runJob(this, aggregatePartition, mergeResult) jobResult }aggregate函數將每個分區里面的元素進行聚合,然后用combine函數將每個分區的結果和初始值(zeroValue)進行combine操作。這個函數最終返回U的類型不需要和RDD的T中元素類型一致。 這樣,我們需要一個函數將T中元素合并到U中,另一個函數將兩個U進行合并。其中,參數1是初值元素;參數2是seq函數是與初值進行比較;參數3是comb函數是進行合并 。?
注意:如果沒有指定分區,aggregate是計算每個分區的,空值則用初始值替換。
實例:
List<Integer> data = Arrays.asList(5, 1, 1, 4, 4, 2, 2); JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data,3); Integer aggregateValue = javaRDD.aggregate(3, new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer v1, Integer v2) throws Exception { System.out.println("seq~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~" + v1 + "," + v2); return Math.max(v1, v2); } }, new Function2<Integer, Integer, Integer>() { int i = 0; @Override public Integer call(Integer v1, Integer v2) throws Exception { System.out.println("comb~~~~~~~~~i~~~~~~~~~~~~~~~~~~~"+i++); System.out.println("comb~~~~~~~~~v1~~~~~~~~~~~~~~~~~~~" + v1); System.out.println("comb~~~~~~~~~v2~~~~~~~~~~~~~~~~~~~" + v2); return v1 + v2; } }); System.out.println("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~"+aggregateValue);aggregateByKey
官方文檔描述:
Aggregate the values of each key, using given combine functions and a neutral "zero value".This function can return a different result type, U, than the type of the values in this RDD,V.Thus, we need one operation for merging a V into a U and one operation for merging two U's,as in scala.TraversableOnce. The former operation is used for merging values within a partition, and the latter is used for merging values between partitions. To avoid memory allocation, both of these functions are allowed to modify and return their first argument instead of creating a new U.函數原型:
def aggregateByKey[U](zeroValue: U, partitioner: Partitioner, seqFunc: JFunction2[U, V, U],combFunc: JFunction2[U, U, U]): JavaPairRDD[K, U] def aggregateByKey[U](zeroValue: U, numPartitions: Int, seqFunc: JFunction2[U, V, U],combFunc: JFunction2[U, U, U]): JavaPairRDD[K, U] def aggregateByKey[U](zeroValue: U, seqFunc: JFunction2[U, V, U], combFunc: JFunction2[U, U, U]): JavaPairRDD[K, U]源碼分析:
def aggregateByKey[U: ClassTag](zeroValue: U, partitioner: Partitioner)(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)] = self.withScope { // Serialize the zero value to a byte array so that we can get a new clone of it on each key val zeroBuffer = SparkEnv.get.serializer.newInstance().serialize(zeroValue) val zeroArray = new Array[Byte](zeroBuffer.limit) zeroBuffer.get(zeroArray) lazy val cachedSerializer = SparkEnv.get.serializer.newInstance() val createZero = () => cachedSerializer.deserialize[U](ByteBuffer.wrap(zeroArray)) // We will clean the combiner closure later in `combineByKey` val cleanedSeqOp = self.context.clean(seqOp) combineByKey[U]((v: V) => cleanedSeqOp(createZero(), v), cleanedSeqOp, combOp, partitioner) }aggregateByKey函數對PairRDD中相同Key的值進行聚合操作,在聚合過程中同樣使用了一個中立的初始值。和aggregate函數類似,aggregateByKey返回值的類型不需要和RDD中value的類型一致。因為aggregateByKey是對相同Key中的值進行聚合操作,所以aggregateByKey函數最終返回的類型還是Pair RDD,對應的結果是Key和聚合好的值;而aggregate函數直接是返回非RDD的結果,這點需要注意。在實現過程中,定義了三個aggregateByKey函數原型,但最終調用的aggregateByKey函數都一致。其中,參數zeroValue代表做比較的初始值;參數partitioner代表分區函數;參數seq代表與初始值比較的函數;參數comb是進行合并的方法。
實例:
//將這個測試程序拿文字做一下描述就是:在data數據集中,按key將value進行分組合并, //合并時在seq函數與指定的初始值進行比較,保留大的值;然后在comb中來處理合并的方式。 List<Integer> data = Arrays.asList(5, 1, 1, 4, 4, 2, 2); int numPartitions = 4; JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data); final Random random = new Random(100); JavaPairRDD<Integer,Integer> javaPairRDD = javaRDD.mapToPair(new PairFunction<Integer, Integer, Integer>() { @Override public Tuple2<Integer, Integer> call(Integer integer) throws Exception { return new Tuple2<Integer, Integer>(integer,random.nextInt(10)); } }); System.out.println("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~"+javaPairRDD.collect());JavaPairRDD<Integer, Integer> aggregateByKeyRDD = javaPairRDD.aggregateByKey(3,numPartitions, new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer v1, Integer v2) throws Exception { System.out.println("seq~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~" + v1 + "," + v2); return Math.max(v1, v2); } }, new Function2<Integer, Integer, Integer>() { int i = 0; @Override public Integer call(Integer v1, Integer v2) throws Exception { System.out.println("comb~~~~~~~~~i~~~~~~~~~~~~~~~~~~~" + i++); System.out.println("comb~~~~~~~~~v1~~~~~~~~~~~~~~~~~~~" + v1); System.out.println("comb~~~~~~~~~v2~~~~~~~~~~~~~~~~~~~" + v2); return v1 + v2; } }); System.out.println("aggregateByKeyRDD.partitions().size()~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~"+aggregateByKeyRDD.partitions().size()); System.out.println("aggregateByKeyRDD~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~"+aggregateByKeyRDD.collec?
?
cogroup
官方文檔描述:
For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the list of values for that key in `this` as well as `other`.函數原型:
def cogroup[W](other: JavaPairRDD[K, W], partitioner: Partitioner): JavaPairRDD[K, (JIterable[V], JIterable[W])]def cogroup[W1, W2](other1: JavaPairRDD[K, W1], other2: JavaPairRDD[K, W2], partitioner: Partitioner): JavaPairRDD[K, (JIterable[V], JIterable[W1], JIterable[W2])]def cogroup[W1, W2, W3](other1: JavaPairRDD[K, W1], other2: JavaPairRDD[K, W2], other3: JavaPairRDD[K, W3], partitioner: Partitioner): JavaPairRDD[K, (JIterable[V], JIterable[W1], JIterable[W2], JIterable[W3])]def cogroup[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (JIterable[V], JIterable[W])]def cogroup[W1, W2](other1: JavaPairRDD[K, W1], other2: JavaPairRDD[K, W2]): JavaPairRDD[K, (JIterable[V], JIterable[W1], JIterable[W2])]def cogroup[W1, W2, W3](other1: JavaPairRDD[K, W1], other2: JavaPairRDD[K, W2], other3: JavaPairRDD[K, W3]): JavaPairRDD[K, (JIterable[V], JIterable[W1], JIterable[W2], JIterable[W3])]def cogroup[W](other: JavaPairRDD[K, W], numPartitions: Int): JavaPairRDD[K, (JIterable[V], JIterable[W])]def cogroup[W1, W2](other1: JavaPairRDD[K, W1], other2: JavaPairRDD[K, W2], numPartitions: Int): JavaPairRDD[K, (JIterable[V], JIterable[W1], JIterable[W2])]def cogroup[W1, W2, W3](other1: JavaPairRDD[K, W1], other2: JavaPairRDD[K, W2], other3: JavaPairRDD[K, W3], numPartitions: Int): JavaPairRDD[K, (JIterable[V], JIterable[W1], JIterable[W2], JIterable[W3])]源碼分析:
def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner) : RDD[(K, (Iterable[V], Iterable[W]))] = self.withScope { if (partitioner.isInstanceOf[HashPartitioner] && keyClass.isArray) { throw new SparkException("Default partitioner cannot partition array keys.") } val cg = new CoGroupedRDD[K](Seq(self, other), partitioner) cg.mapValues { case Array(vs, w1s) => (vs.asInstanceOf[Iterable[V]], w1s.asInstanceOf[Iterable[W]]) } }override def getDependencies: Seq[Dependency[_]] = { rdds.map { rdd: RDD[_ <: Product2[K, _]] => if (rdd.partitioner == Some(part)) { logDebug("Adding one-to-one dependency with " + rdd) new OneToOneDependency(rdd) } else { logDebug("Adding shuffle dependency with " + rdd) new ShuffleDependency[K, Any, CoGroupCombiner](rdd, part, serializer) } } } override def getPartitions: Array[Partition] = { val array = new Array[Partition](part.numPartitions) for (i <- 0 until array.length) { // Each CoGroupPartition will have a dependency per contributing RDD array(i) = new CoGroupPartition(i, rdds.zipWithIndex.map { case (rdd, j) => // Assume each RDD contributed a single dependency, and get it dependencies(j) match {case s: ShuffleDependency[_, _, _] => None case _ => Some(new NarrowCoGroupSplitDep(rdd, i, rdd.partitions(i))) } }.toArray) } array }cogroup() 的計算結果放在 CoGroupedRDD 中哪個 partition 是由用戶設置的 partitioner 確定的(默認是 HashPartitioner)。?
CoGroupedRDD 依賴的所有 RDD 放進數組 rdds[RDD] 中。再次,foreach i,如果 CoGroupedRDD 和 rdds(i) 對應的 RDD 是 OneToOneDependency 關系,那么?Dependecy[i] = new OneToOneDependency(rdd),否則 =?new ShuffleDependency(rdd)。最后,返回與每個 parent RDD 的依賴關系數組 deps[Dependency]。?
Dependency 類中的 getParents(partition id) 負責給出某個 partition 按照該 dependency 所依賴的 parent RDD 中的 partitions: List[Int]。?
getPartitions() 負責給出 RDD 中有多少個 partition,以及每個 partition 如何序列化。
實例:
List<Integer> data = Arrays.asList(1, 2, 4, 3, 5, 6, 7, 1, 2); JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data);JavaPairRDD<Integer,Integer> javaPairRDD = javaRDD.mapToPair(new PairFunction<Integer, Integer, Integer>() { @Override public Tuple2<Integer, Integer> call(Integer integer) throws Exception { return new Tuple2<Integer, Integer>(integer,1); } });//與 groupByKey() 不同,cogroup() 要 aggregate 兩個或兩個以上的 RDD。 JavaPairRDD<Integer,Tuple2<Iterable<Integer>,Iterable<Integer>>> cogroupRDD = javaPairRDD.cogroup(javaPairRDD); System.out.println(cogroupRDD.collect());JavaPairRDD<Integer,Tuple2<Iterable<Integer>,Iterable<Integer>>> cogroupRDD3 = javaPairRDD.cogroup(javaPairRDD, new Partitioner() { @Override public int numPartitions() { return 2; } @Override public int getPartition(Object key) { return (key.toString()).hashCode()%numPartitions();} }); System.out.println(cogroupRDD3);join
官方文檔描述:
Return an RDD containing all pairs of elements with matching keys in `this` and `other`. Each pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and (k, v2) is in `other`. Performs a hash join across the cluster.函數原型:
def join[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (V, W)]def join[W](other: JavaPairRDD[K, W], numPartitions: Int): JavaPairRDD[K, (V, W)]def join[W](other: JavaPairRDD[K, W], partitioner: Partitioner): JavaPairRDD[K, (V, W)]源碼分析:
def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] = self.withScope { this.cogroup(other, partitioner).flatMapValues( pair => for (v <- pair._1.iterator; w <- pair._2.iterator) yield (v, w) ) }從源碼中可以看出,join() 將兩個 RDD[(K, V)] 按照 SQL 中的 join 方式聚合在一起。與 intersection() 類似,首先進行 cogroup(), 得到
實例:
List<Integer> data = Arrays.asList(1, 2, 4, 3, 5, 6, 7); final Random random = new Random(); JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data); JavaPairRDD<Integer,Integer> javaPairRDD = javaRDD.mapToPair(new PairFunction<Integer, Integer, Integer>() { @Override public Tuple2<Integer, Integer> call(Integer integer) throws Exception { return new Tuple2<Integer, Integer>(integer,random.nextInt(10)); } });JavaPairRDD<Integer,Tuple2<Integer,Integer>> joinRDD = javaPairRDD.join(javaPairRDD); System.out.println(joinRDD.collect());JavaPairRDD<Integer,Tuple2<Integer,Integer>> joinRDD2 = javaPairRDD.join(javaPairRDD,2); System.out.println(joinRDD2.collect());JavaPairRDD<Integer,Tuple2<Integer,Integer>> joinRDD3 = javaPairRDD.join(javaPairRDD, new Partitioner() { @Override public int numPartitions() { return 2; } @Override public int getPartition(Object key) { return (key.toString()).hashCode()%numPartitions();} }); System.out.println(joinRDD3.collect());?
?
fullOuterJoin
官方文檔描述:
Perform a full outer join of `this` and `other`. For each element (k, v) in `this`, the resulting RDD will either contain all pairs (k, (Some(v), Some(w))) for w in `other`, or the pair (k, (Some(v), None)) if no elements in `other` have key k. Similarly, for each element (k, w) in `other`, the resulting RDD will either contain all pairs (k, (Some(v), Some(w))) for v in `this`, or the pair (k, (None, Some(w))) if no elements in `this` have key k. Uses the given Partitioner to partition the output RDD.函數原型:
def fullOuterJoin[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (Optional[V], Optional[W])]def fullOuterJoin[W](other: JavaPairRDD[K, W], numPartitions: Int) : JavaPairRDD[K, (Optional[V], Optional[W])]def fullOuterJoin[W](other: JavaPairRDD[K, W], partitioner: Partitioner) : JavaPairRDD[K, (Optional[V], Optional[W])]源碼分析:
def fullOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner) : RDD[(K, (Option[V], Option[W]))] = self.withScope { this.cogroup(other, partitioner).flatMapValues { case (vs, Seq()) => vs.iterator.map(v => (Some(v), None)) case (Seq(), ws) => ws.iterator.map(w => (None, Some(w))) case (vs, ws) => for (v <- vs.iterator; w <- ws.iterator) yield (Some(v), Some(w)) } }從源碼中可以看出,fullOuterJoin() 與 join() 類似,首先進行 cogroup(), 得到?<K, (Iterable[V1], Iterable[V2])>?類型的 MappedValuesRDD,然后對 Iterable[V1] 和 Iterable[V2] 做笛卡爾集,注意在V1,V2中添加了None,并將集合 flat() 化。
實例:
List<Integer> data = Arrays.asList(1, 2, 4, 3, 5, 6, 7); final Random random = new Random(); JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data); JavaPairRDD<Integer,Integer> javaPairRDD = javaRDD.mapToPair(new PairFunction<Integer, Integer, Integer>() { @Override public Tuple2<Integer, Integer> call(Integer integer) throws Exception { return new Tuple2<Integer, Integer>(integer,random.nextInt(10)); } });//全關聯 JavaPairRDD<Integer,Tuple2<Optional<Integer>,Optional<Integer>>> fullJoinRDD = javaPairRDD.fullOuterJoin(javaPairRDD); System.out.println(fullJoinRDD);JavaPairRDD<Integer,Tuple2<Optional<Integer>,Optional<Integer>>> fullJoinRDD1 = javaPairRDD.fullOuterJoin(javaPairRDD,2); System.out.println(fullJoinRDD1);JavaPairRDD<Integer,Tuple2<Optional<Integer>,Optional<Integer>>> fullJoinRDD2 = javaPairRDD.fullOuterJoin(javaPairRDD, new Partitioner() { @Override public int numPartitions() { return 2; } @Override public int getPartition(Object key) { return (key.toString()).hashCode()%numPartitions(); } }); System.out.println(fullJoinRDD2);leftOuterJoin
官方文檔描述:
Perform a left outer join of `this` and `other`. For each element (k, v) in `this`, the resulting RDD will either contain all pairs (k, (v, Some(w))) for w in `other`, or the pair (k, (v, None)) if no elements in `other` have key k. Uses the given Partitioner to partition the output RDD.函數原型:
def leftOuterJoin[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (V, Optional[W])]def leftOuterJoin[W](other: JavaPairRDD[K, W], numPartitions: Int) : JavaPairRDD[K, (V, Optional[W])]def leftOuterJoin[W](other: JavaPairRDD[K, W], partitioner: Partitioner): JavaPairRDD[K, (V, Optional[W])]源碼分析:
def leftOuterJoin[W]( other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, Option[W]))] = self.withScope { this.cogroup(other, partitioner).flatMapValues { pair => if (pair._2.isEmpty) { pair._1.iterator.map(v => (v, None)) } else { for (v <- pair._1.iterator; w <- pair._2.iterator) yield (v, Some(w)) } } }從源碼中可以看出,leftOuterJoin() 與 fullOuterJoin() 類似,首先進行 cogroup(), 得到?<K, (Iterable[V1], Iterable[V2])>類型的 MappedValuesRDD,然后對 Iterable[V1] 和 Iterable[V2] 做笛卡爾集,注意在V1中添加了None,并將集合 flat() 化。
實例:
List<Integer> data = Arrays.asList(1, 2, 4, 3, 5, 6, 7); final Random random = new Random(); JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data); JavaPairRDD<Integer,Integer> javaPairRDD = javaRDD.mapToPair(new PairFunction<Integer, Integer, Integer>() { @Override public Tuple2<Integer, Integer> call(Integer integer) throws Exception { return new Tuple2<Integer, Integer>(integer,random.nextInt(10)); } });//左關聯 JavaPairRDD<Integer,Tuple2<Integer,Optional<Integer>>> leftJoinRDD = javaPairRDD.leftOuterJoin(javaPairRDD); System.out.println(leftJoinRDD);JavaPairRDD<Integer,Tuple2<Integer,Optional<Integer>>> leftJoinRDD1 = javaPairRDD.leftOuterJoin(javaPairRDD,2); System.out.println(leftJoinRDD1);JavaPairRDD<Integer,Tuple2<Integer,Optional<Integer>>> leftJoinRDD2 = javaPairRDD.leftOuterJoin(javaPairRDD, new Partitioner() { @Override public int numPartitions() { return 2; } @Override public int getPartition(Object key) { return (key.toString()).hashCode()%numPartitions(); } }); System.out.println(leftJoinRDD2);rightOuterJoin
官方文檔描述:
Perform a right outer join of `this` and `other`. For each element (k, w) in `other`, the resulting RDD will either contain all pairs (k, (Some(v), w)) for v in `this`, or the pair (k, (None, w)) if no elements in `this` have key k. Uses the given Partitioner to partition the output RDD.函數原型:
def rightOuterJoin[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (Optional[V], W)]def rightOuterJoin[W](other: JavaPairRDD[K, W], numPartitions: Int) : JavaPairRDD[K, (Optional[V], W)]def rightOuterJoin[W](other: JavaPairRDD[K, W], partitioner: Partitioner): JavaPairRDD[K, (Optional[V], W)]源碼分析:
def rightOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner) : RDD[(K, (Option[V], W))] = self.withScope { this.cogroup(other, partitioner).flatMapValues { pair => if (pair._1.isEmpty) { pair._2.iterator.map(w => (None, w)) } else { for (v <- pair._1.iterator; w <- pair._2.iterator) yield (Some(v), w) } } }從源碼中可以看出,rightOuterJoin() 與 fullOuterJoin() 類似,首先進行 cogroup(), 得到?<K, (Iterable[V1], Iterable[V2])>?類型的 MappedValuesRDD,然后對 Iterable[V1] 和 Iterable[V2] 做笛卡爾集,注意在V2中添加了None,并將集合 flat() 化。
實例:
List<Integer> data = Arrays.asList(1, 2, 4, 3, 5, 6, 7); final Random random = new Random(); JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data); JavaPairRDD<Integer,Integer> javaPairRDD = javaRDD.mapToPair(new PairFunction<Integer, Integer, Integer>() { @Override public Tuple2<Integer, Integer> call(Integer integer) throws Exception { return new Tuple2<Integer, Integer>(integer,random.nextInt(10)); } });//右關聯 JavaPairRDD<Integer,Tuple2<Optional<Integer>,Integer>> rightJoinRDD = javaPairRDD.rightOuterJoin(javaPairRDD); System.out.println(rightJoinRDD);JavaPairRDD<Integer,Tuple2<Optional<Integer>,Integer>> rightJoinRDD1 = javaPairRDD.rightOuterJoin(javaPairRDD,2); System.out.println(rightJoinRDD1);JavaPairRDD<Integer,Tuple2<Optional<Integer>,Integer>> rightJoinRDD2 = javaPairRDD.rightOuterJoin(javaPairRDD, new Partitioner() { @Override public int numPartitions() { return 2; } @Override public int getPartition(Object key) { return (key.toString()).hashCode()%numPartitions(); } }); System.out.println(rightJoinRDD2);?
?
sortByKey
官方文檔描述:
Sort the RDD by key, so that each partition contains a sorted range of the elements in ascending order. Calling `collect` or `save` on the resulting RDD will return or output an ordered list of records (in the `save` case, they will be written to multiple `part-X` files in the filesystem, in order of the keys).函數原型:
def sortByKey(): JavaPairRDD[K, V]def sortByKey(ascending: Boolean): JavaPairRDD[K, V]def sortByKey(ascending: Boolean, numPartitions: Int): JavaPairRDD[K, V]def sortByKey(comp: Comparator[K]): JavaPairRDD[K, V]def sortByKey(comp: Comparator[K], ascending: Boolean): JavaPairRDD[K, V]def sortByKey(comp: Comparator[K], ascending: Boolean, numPartitions: Int): JavaPairRDD[K, V]源碼分析:
def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length) : RDD[(K, V)] = self.withScope{ val part = new RangePartitioner(numPartitions, self, ascending) new ShuffledRDD[K, V, V](self, part) .setKeyOrdering(if (ascending) ordering else ordering.reverse) }sortByKey() 將 RDD[(K, V)] 中的 records 按 key 排序,ascending = true 表示升序,false 表示降序。目前 sortByKey() 的數據依賴很簡單,先使用 shuffle 將 records 聚集在一起(放到對應的 partition 里面),然后將 partition 內的所有 records 按 key 排序,最后得到的 MapPartitionsRDD 中的 records 就有序了。目前 sortByKey() 先使用 Array 來保存 partition 中所有的 records,再排序。
實例:
List<Integer> data = Arrays.asList(1, 2, 4, 3, 5, 6, 7); JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data); final Random random = new Random(100); JavaPairRDD<Integer,Integer> javaPairRDD = javaRDD.mapToPair(new PairFunction<Integer, Integer, Integer>() { @Override public Tuple2<Integer, Integer> call(Integer integer) throws Exception { return new Tuple2<Integer, Integer>(integer,random.nextInt(10)); } });JavaPairRDD<Integer,Integer> sortByKeyRDD = javaPairRDD.sortByKey(); System.out.println(sortByKeyRDD.collect());repartitionAndSortWithinPartitions
官方文檔描述:
Repartition the RDD according to the given partitioner and, within each resulting partition,sort records by their keys.This is more efficient than calling `repartition` and then sorting within each partition because it can push the sorting down into the shuffle machinery.函數原型:
def repartitionAndSortWithinPartitions(partitioner: Partitioner): JavaPairRDD[K, V]def repartitionAndSortWithinPartitions(partitioner: Partitioner, comp: Comparator[K]) : JavaPairRDD[K, V]源碼分析:
def repartitionAndSortWithinPartitions(partitioner: Partitioner): RDD[(K, V)] = self.withScope { new ShuffledRDD[K, V, V](self, partitioner).setKeyOrdering(ordering) }從源碼中可以看出,該方法依據partitioner對RDD進行分區,并且在每個結果分區中按key進行排序;通過對比sortByKey發現,這種方式比先分區,然后在每個分區中進行排序效率高,這是因為它可以將排序融入到shuffle階段。
實例:
List<Integer> data = Arrays.asList(1, 2, 4, 3, 5, 6, 7); JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data); final Random random = new Random();JavaPairRDD<Integer,Integer> javaPairRDD = javaRDD.mapToPair(new PairFunction<Integer, Integer, Integer>() { @Override public Tuple2<Integer, Integer> call(Integer integer) throws Exception { return new Tuple2<Integer, Integer>(integer,random.nextInt(10)); } });JavaPairRDD<Integer,Integer> RepartitionAndSortWithPartitionsRDD = javaPairRDD.repartitionAndSortWithinPartitions(new Partitioner() { @Override public int numPartitions() { return 2; } @Override public int getPartition(Object key) { return key.toString().hashCode() % numPartitions(); } }); System.out.println(RepartitionAndSortWithPartitionsRDD.collect());?
?
combineByKey
官方文檔描述:
Generic function to combine the elements for each key using a custom set of aggregation functions. Turns an RDD[(K, V)] into a result of type RDD[(K, C)], for a "combined type" C Note that V and C can be different -- for example, one might group an RDD of type (Int, Int) into an RDD of type (Int, Seq[Int]). Users provide three functions:- `createCombiner`, which turns a V into a C (e.g., creates a one-element list)- `mergeValue`, to merge a V into a C (e.g., adds it to the end of a list)- `mergeCombiners`, to combine two C's into a single one. In addition, users can control the partitioning of the output RDD, and whether to perform map-side aggregation (if a mapper can produce multiple items with the same key).函數原型:
def combineByKey[C](createCombiner: JFunction[V, C], mergeValue: JFunction2[C, V, C], mergeCombiners: JFunction2[C, C, C]): JavaPairRDD[K, C]def combineByKey[C](createCombiner: JFunction[V, C], mergeValue: JFunction2[C, V, C], mergeCombiners: JFunction2[C, C, C], numPartitions: Int): JavaPairRDD[K, C]def combineByKey[C](createCombiner: JFunction[V, C], mergeValue: JFunction2[C, V, C], mergeCombiners: JFunction2[C, C, C], partitioner: Partitioner): JavaPairRDD[K, C]def combineByKey[C](createCombiner: JFunction[V, C], mergeValue: JFunction2[C, V, C], mergeCombiners: JFunction2[C, C, C], partitioner: Partitioner, mapSideCombine: Boolean,serializer: Serializer): JavaPairRDD[K, C]該函數是用于將RDD[k,v]轉化為RDD[k,c],其中類型v和類型c可以相同也可以不同。?
其中的參數如下:?
- createCombiner:該函數用于將輸入參數RDD[k,v]的類型V轉化為輸出參數RDD[k,c]中類型C;?
- mergeValue:合并函數,用于將輸入中的類型C的值和類型V的值進行合并,得到類型C,輸入參數是(C,V),輸出是C;?
- mergeCombiners:合并函數,用于將兩個類型C的值合并成一個類型C,輸入參數是(C,C),輸出是C;?
- numPartitions:默認HashPartitioner中partition的個數;?
- partitioner:分區函數,默認是HashPartitionner;?
- mapSideCombine:該函數用于判斷是否需要在map進行combine操作,類似于MapReduce中的combine,默認是 true。
源碼分析:
def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, partitioner: Partitioner, mapSideCombine: Boolean = true, serializer: Serializer = null): RDD[(K, C)] = self.withScope { require(mergeCombiners != null, "mergeCombiners must be defined") // required as of Spark 0.9.0 if (keyClass.isArray) { if (mapSideCombine) { throw new SparkException("Cannot use map-side combining with array keys.") } if (partitioner.isInstanceOf[HashPartitioner]) { throw new SparkException("Default partitioner cannot partition array keys.") } } val aggregator = new Aggregator[K, V, C]( self.context.clean(createCombiner), self.context.clean(mergeValue), self.context.clean(mergeCombiners)) if (self.partitioner == Some(partitioner)) { self.mapPartitions(iter => { val context = TaskContext.get() new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context)) }, preservesPartitioning = true) } else { new ShuffledRDD[K, V, C](self, partitioner) .setSerializer(serializer) .setAggregator(aggregator) .setMapSideCombine(mapSideCombine) } }從源碼中可以看出,combineByKey()的實現是一邊進行aggregate,一邊進行compute() 的基礎操作。假設一組具有相同 K 的?<K, V>?records 正在一個個流向 combineByKey(),createCombiner 將第一個 record 的 value 初始化為 c (比如,c = value),然后從第二個 record 開始,來一個 record 就使用 mergeValue(c, record.value) 來更新 c,比如想要對這些 records 的所有 values 做 sum,那么使用?c = c + record.value。等到 records 全部被 mergeValue(),得到結果 c。假設還有一組 records(key 與前面那組的 key 均相同)一個個到來,combineByKey() 使用前面的方法不斷計算得到 c’。現在如果要求這兩組 records 總的 combineByKey() 后的結果,那么可以使用?final c = mergeCombiners(c, c')?來計算;然后依據partitioner進行不同分區合并。
實例:
List<Integer> data = Arrays.asList(1, 2, 4, 3, 5, 6, 7, 1, 2); JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data); //轉化為pairRDD JavaPairRDD<Integer,Integer> javaPairRDD = javaRDD.mapToPair(new PairFunction<Integer, Integer, Integer>() { @Override public Tuple2<Integer, Integer> call(Integer integer) throws Exception { return new Tuple2<Integer, Integer>(integer,1); } });JavaPairRDD<Integer,String> combineByKeyRDD = javaPairRDD.combineByKey(new Function<Integer, String>() { @Override public String call(Integer v1) throws Exception { return v1 + " :createCombiner: "; }}, new Function2<String, Integer, String>() { @Override public String call(String v1, Integer v2) throws Exception { return v1 + " :mergeValue: " + v2; } }, new Function2<String, String, String>() { @Override public String call(String v1, String v2) throws Exception { return v1 + " :mergeCombiners: " + v2; } }); System.out.println("result~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~" + combineByKeyRDD.collect());groupByKey
官方文檔描述:
Group the values for each key in the RDD into a single sequence. Allows controlling the partitioning of the resulting key-value pair RDD by passing a Partitioner. The ordering of elements within each group is not guaranteed,and may even differ each time the resulting RDD is evaluated.Note: This operation may be very expensive. If you are grouping in order to perform an aggregation (such as a sum or average) over each key, using [[PairRDDFunctions.aggregateByKey]] or [[PairRDDFunctions.reduceByKey]] will provide much better performance.Note: As currently implemented, groupByKey must be able to hold all the key-value pairs for any key in memory. If a key has too many values, it can result in an [[OutOfMemoryError]].函數原型:
def groupByKey(partitioner: Partitioner): JavaPairRDD[K, JIterable[V]]def groupByKey(numPartitions: Int): JavaPairRDD[K, JIterable[V]]源碼分析:
def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] = self.withScope { // groupByKey shouldn't use map side combine because map side combine does not // reduce the amount of data shuffled and requires all map side data be inserted // into a hash table, leading to more objects in the old gen. val createCombiner = (v: V) => CompactBuffer(v) val mergeValue = (buf: CompactBuffer[V], v: V) => buf += v val mergeCombiners = (c1: CompactBuffer[V], c2: CompactBuffer[V]) => c1 ++= c2 val bufs = combineByKey[CompactBuffer[V]]( createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine = false) bufs.asInstanceOf[RDD[(K, Iterable[V])]] }從源碼中可以看出groupByKey()是基于combineByKey()實現的, 只是將 Key 相同的 records 聚合在一起,一個簡單的 shuffle 過程就可以完成。ShuffledRDD 中的 compute() 只負責將屬于每個 partition 的數據 fetch 過來,之后使用 mapPartitions() 操作進行 aggregate,生成 MapPartitionsRDD,到這里 groupByKey() 已經結束。最后為了統一返回值接口,將 value 中的 ArrayBuffer[] 數據結構抽象化成 Iterable[]。groupByKey() 沒有在 map 端進行 combine(mapSideCombine = false),這樣設計是因為map 端 combine 只會省掉 partition 里面重復 key 占用的空間;但是,當重復 key 特別多時,可以考慮開啟 combine。
實例:
List<Integer> data = Arrays.asList(1, 2, 4, 3, 5, 6, 7); JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data); //轉為k,v格式 JavaPairRDD<Integer,Integer> javaPairRDD = javaRDD.mapToPair(new PairFunction<Integer, Integer, Integer>() { @Override public Tuple2<Integer, Integer> call(Integer integer) throws Exception { return new Tuple2<Integer, Integer>(integer,1); } });JavaPairRDD<Integer,Iterable<Integer>> groupByKeyRDD = javaPairRDD.groupByKey(2); System.out.println(groupByKeyRDD.collect());//自定義partition JavaPairRDD<Integer,Iterable<Integer>> groupByKeyRDD3 = javaPairRDD.groupByKey(new Partitioner() { //partition各數 @Override public int numPartitions() { return 10; } //partition方式 @Override public int getPartition(Object o) { return (o.toString()).hashCode()%numPartitions(); } }); System.out.println(groupByKeyRDD3.collect());?
?
reduceByKey
官方文檔描述:
Merge the values for each key using an associative reduce function. This will also perform the merging locally on each mapper before sending results to a reducer, similarly to a "combiner" in MapReduce.函數原型:
def reduceByKey(partitioner: Partitioner, func: JFunction2[V, V, V]): JavaPairRDD[K, V]def reduceByKey(func: JFunction2[V, V, V], numPartitions: Int): JavaPairRDD[K, V]該函數利用映射函數將每個K對應的V進行運算。?
其中參數說明如下:?
- func:映射函數,根據需求自定義;?
- partitioner:分區函數;?
- numPartitions:分區數,默認的分區函數是HashPartitioner。
源碼分析:
def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = self.withScope { combineByKey[V]((v: V) => v, func, func, partitioner) }從源碼中可以看出,reduceByKey()是基于combineByKey()實現的,其中createCombiner只是簡單的轉化,而mergeValue和mergeCombiners相同,都是利用用戶自定義函數。reduceyByKey() 相當于傳統的 MapReduce,整個數據流也與 Hadoop 中的數據流基本一樣。在combineByKey()中在 map 端開啟 combine(),因此,reduceyByKey() 默認也在 map 端開啟 combine(),這樣在 shuffle 之前先通過 mapPartitions 操作進行 combine,得到 MapPartitionsRDD, 然后 shuffle 得到 ShuffledRDD,再進行 reduce(通過 aggregate + mapPartitions() 操作來實現)得到 MapPartitionsRDD。
實例:
List<Integer> data = Arrays.asList(1, 2, 4, 3, 5, 6, 7); JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data);//轉化為K,V格式 JavaPairRDD<Integer,Integer> javaPairRDD = javaRDD.mapToPair(new PairFunction<Integer, Integer, Integer>() { @Override public Tuple2<Integer, Integer> call(Integer integer) throws Exception { return new Tuple2<Integer, Integer>(integer,1); } }); JavaPairRDD<Integer,Integer> reduceByKeyRDD = javaPairRDD.reduceByKey(new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer v1, Integer v2) throws Exception { return v1 + v2; } }); System.out.println(reduceByKeyRDD.collect());//指定numPartitions JavaPairRDD<Integer,Integer> reduceByKeyRDD2 = javaPairRDD.reduceByKey(new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer v1, Integer v2) throws Exception { return v1 + v2; } },2); System.out.println(reduceByKeyRDD2.collect());//自定義partition JavaPairRDD<Integer,Integer> reduceByKeyRDD4 = javaPairRDD.reduceByKey(new Partitioner() { @Override public int numPartitions() { return 2; } @Override public int getPartition(Object o) { return (o.toString()).hashCode()%numPartitions(); } }, new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer v1, Integer v2) throws Exception { return v1 + v2; } }); System.out.println(reduceByKeyRDD4.collect());foldByKey
官方文檔描述:
Merge the values for each key using an associative function and a neutral "zero value" which may be added to the result an arbitrary number of times, and must not change the result (e.g., Nil for list concatenation, 0 for addition, or 1 for multiplication.).函數原型:
def foldByKey(zeroValue: V, partitioner: Partitioner, func: JFunction2[V, V, V]): JavaPairRDD[K, V]def foldByKey(zeroValue: V, numPartitions: Int, func: JFunction2[V, V, V]): JavaPairRDD[K, V]def foldByKey(zeroValue: V, func: JFunction2[V, V, V]): JavaPairRDD[K, V]該函數用于將K對應V利用函數映射進行折疊、合并處理,其中參數zeroValue是對V進行初始化。?
具體參數如下:?
- zeroValue:初始值;?
- numPartitions:分區數,默認的分區函數是HashPartitioner;?
- partitioner:分區函數;?
- func:映射函數,用戶自定義函數。
源碼分析:
def foldByKey( zeroValue: V, partitioner: Partitioner)(func: (V, V) => V): RDD[(K, V)] = self.withScope { // Serialize the zero value to a byte array so that we can get a new clone of it on each key val zeroBuffer = SparkEnv.get.serializer.newInstance().serialize(zeroValue) val zeroArray = new Array[Byte](zeroBuffer.limit) zeroBuffer.get(zeroArray) // When deserializing, use a lazy val to create just one instance of the serializer per task lazy val cachedSerializer = SparkEnv.get.serializer.newInstance() val createZero = () => cachedSerializer.deserialize[V](ByteBuffer.wrap(zeroArray)) val cleanedFunc = self.context.clean(func) combineByKey[V]((v: V) => cleanedFunc(createZero(), v), cleanedFunc, cleanedFunc, partitioner) }從foldByKey()實現可以看出,該函數是基于combineByKey()實現的,其中createCombiner只是利用zeroValue對V進行初始化,而mergeValue和mergeCombiners相同,都是利用用戶自定義函數。在這里需要注意如果實現K的V聚合操作,初始設置需要特別注意,不要改變聚合的結果。
實例:
List<Integer> data = Arrays.asList(1, 2, 4, 3, 5, 6, 7, 1, 2); JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data); final Random rand = new Random(10); JavaPairRDD<Integer,String> javaPairRDD = javaRDD.mapToPair(new PairFunction<Integer, Integer, String>() { @Override public Tuple2<Integer, String> call(Integer integer) throws Exception { return new Tuple2<Integer, String>(integer,Integer.toString(rand.nextInt(10))); } });JavaPairRDD<Integer,String> foldByKeyRDD = javaPairRDD.foldByKey("X", new Function2<String, String, String>() { @Override public String call(String v1, String v2) throws Exception { return v1 + ":" + v2; } }); System.out.println("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~" + foldByKeyRDD.collect());JavaPairRDD<Integer,String> foldByKeyRDD1 = javaPairRDD.foldByKey("X", 2, new Function2<String, String, String>() { @Override public String call(String v1, String v2) throws Exception { return v1 + ":" + v2; } }); System.out.println("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~" + foldByKeyRDD1.collect());JavaPairRDD<Integer,String> foldByKeyRDD2 = javaPairRDD.foldByKey("X", new Partitioner() { @Override public int numPartitions() { return 3; } @Override public int getPartition(Object key) { return key.toString().hashCode()%numPartitions(); } }, new Function2<String, String, String>() { @Override public String call(String v1, String v2) throws Exception { return v1 + ":" + v2; } }); System.out.println("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~" + foldByKeyRDD2.collect());?
?
zipPartitions
官方文檔描述:
Zip this RDD's partitions with one (or more) RDD(s) and return a new RDD by applying a function to the zipped partitions. Assumes that all the RDDs have the same number of partitions, but does not require them to have the same number of elements in each partition.函數原型:
def zipPartitions[U, V]( other: JavaRDDLike[U, _], f: FlatMapFunction2[java.util.Iterator[T], java.util.Iterator[U], V]): JavaRDD[V]該函數將兩個分區RDD按照partition進行合并,形成一個新的RDD。
源碼分析:
def zipPartitions[B: ClassTag, V: ClassTag] (rdd2: RDD[B], preservesPartitioning: Boolean) (f: (Iterator[T], Iterator[B]) => Iterator[V]): RDD[V] = withScope { new ZippedPartitionsRDD2(sc, sc.clean(f), this, rdd2, preservesPartitioning) }private[spark] class ZippedPartitionsRDD2[A: ClassTag, B: ClassTag, V: ClassTag]( sc: SparkContext, var f: (Iterator[A], Iterator[B]) => Iterator[V], var rdd1: RDD[A], var rdd2: RDD[B], preservesPartitioning: Boolean = false) extends ZippedPartitionsBaseRDD[V](sc, List(rdd1, rdd2), preservesPartitioning) { override def compute(s: Partition, context: TaskContext): Iterator[V] = { val partitions = s.asInstanceOf[ZippedPartitionsPartition].partitions f(rdd1.iterator(partitions(0), context), rdd2.iterator(partitions(1), context)) } override def clearDependencies() { super.clearDependencies() rdd1 = null rdd2 = null f = null } }從源碼中可以看出,zipPartitions函數生成ZippedPartitionsRDD2,該RDD繼承ZippedPartitionsBaseRDD,在ZippedPartitionsBaseRDD中的getPartitions方法中判斷需要組合的RDD是否具有相同的分區數,但是該RDD實現中并沒有要求每個partitioner內的元素數量相同。
實例:
List<Integer> data = Arrays.asList(5, 1, 1, 4, 4, 2, 2); JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data,3); List<Integer> data1 = Arrays.asList(3, 2, 12, 5, 6, 1); JavaRDD<Integer> javaRDD1 = javaSparkContext.parallelize(data1,3); JavaRDD<String> zipPartitionsRDD = javaRDD.zipPartitions(javaRDD1, new FlatMapFunction2<Iterator<Integer>, Iterator<Integer>, String>() { @Override public Iterable<String> call(Iterator<Integer> integerIterator, Iterator<Integer> integerIterator2) throws Exception { LinkedList<String> linkedList = new LinkedList<String>(); while(integerIterator.hasNext() && integerIterator2.hasNext()) linkedList.add(integerIterator.next().toString() + "_" + integerIterator2.next().toString()); return linkedList; } }); System.out.println("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~" + zipPartitionsRDD.collect());zip
官方文檔描述:
Zips this RDD with another one, returning key-value pairs with the first element in each RDD,second element in each RDD, etc. Assumes that the two RDDs have the same number of partitions and the same number of elements in each partition (e.g. one was made through a map on the other).函數原型:
def zip[U](other: JavaRDDLike[U, _]): JavaPairRDD[T, U]該函數用于將兩個RDD進行組合,組合成一個key/value形式的RDD。
源碼分析:
def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)] = withScope { zipPartitions(other, preservesPartitioning = false) { (thisIter, otherIter) => new Iterator[(T, U)] { def hasNext: Boolean = (thisIter.hasNext, otherIter.hasNext) match { case (true, true) => true case (false, false) => false case _ => throw new SparkException("Can only zip RDDs with " + "same number of elements in each partition") } def next(): (T, U) = (thisIter.next(), otherIter.next()) } } }從源碼中可以看出,zip函數是基于zipPartitions實現的,其中preservesPartitioning為false,preservesPartitioning表示是否保留父RDD的partitioner分區;另外,兩個RDD的partition數量及元數的數量都是相同的,否則會拋出異常。
實例:
List<Integer> data = Arrays.asList(5, 1, 1, 4, 4, 2, 2); JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data,3); List<Integer> data1 = Arrays.asList(3,2,12,5,6,1,7); JavaRDD<Integer> javaRDD1 = javaSparkContext.parallelize(data1); JavaPairRDD<Integer,Integer> zipRDD = javaRDD.zip(javaRDD1); System.out.println("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~" + zipRDD.collect());?
?
zipWithIndex
官方文檔描述:
Zips this RDD with its element indices. The ordering is first based on the partition index and then the ordering of items within each partition. So the first item in the first partition gets index 0, and the last item in the last partition receives the largest index. This is similar to Scala's zipWithIndex but it uses Long instead of Int as the index type.This method needs to trigger a spark job when this RDD contains more than one partitions.函數原型:
def zipWithIndex(): JavaPairRDD[T, JLong]該函數將RDD中的元素和這個元素在RDD中的indices組合起來,形成鍵/值對的RDD。
源碼分析:
def zipWithIndex(): RDD[(T, Long)] = withScope { new ZippedWithIndexRDD(this) }/** The start index of each partition. */ @transient private val startIndices: Array[Long] = { val n = prev.partitions.length if (n == 0) { Array[Long]() } else if (n == 1) { Array(0L) } else { prev.context.runJob( prev, Utils.getIteratorSize _, 0 until n - 1, // do not need to count the last partition allowLocal = false ).scanLeft(0L)(_ + _) } }override def compute(splitIn: Partition, context: TaskContext): Iterator[(T, Long)] = { val split = splitIn.asInstanceOf[ZippedWithIndexRDDPartition] firstParent[T].iterator(split.prev, context).zipWithIndex.map { x => (x._1, split.startIndex + x._2) } }從源碼中可以看出,該函數返回ZippedWithIndexRDD,在ZippedWithIndexRDD中通過計算startIndices獲得index;然后在compute函數中利用scala的zipWithIndex計算index。
實例:
List<Integer> data = Arrays.asList(5, 1, 1, 4, 4, 2, 2); JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data,3); List<Integer> data1 = Arrays.asList(3,2,12,5,6,1,7); JavaRDD<Integer> javaRDD1 = javaSparkContext.parallelize(data1); JavaPairRDD<Integer,Long> zipWithIndexRDD = javaRDD.zipWithIndex(); System.out.println("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~" + zipWithIndexRDD.collect());zipWithUniqueId
官方文檔描述:
Zips this RDD with generated unique Long ids. Items in the kth partition will get ids k, n+k,2*n+k, ..., where n is the number of partitions. So there may exist gaps, but this method won't trigger a spark job, which is different from [[org.apache.spark.rdd.RDD#zipWithIndex]].函數原型:
def zipWithUniqueId(): JavaPairRDD[T, JLong]該函數將RDD中的元素和一個對應的唯一ID組合成鍵值對,其中ID的生成算法是每個分區的第一元素的ID是該分區索引號,每個分區中的第N個元素的ID是(N * 該RDD總的分區數) + (該分區索引號)。
源碼分析:
def zipWithUniqueId(): RDD[(T, Long)] = withScope { val n = this.partitions.length.toLong this.mapPartitionsWithIndex { case (k, iter) => iter.zipWithIndex.map { case (item, i) => (item, i * n + k) } } }*從源碼中可以看出,zipWithUniqueId()函數是利用mapPartitionsWithIndex()函數獲得每個元素的分區索引號,同時利用(i*n + k)進行相應的計算。
實例:
List<Integer> data = Arrays.asList(5, 1, 1, 4, 4, 2, 2); JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data,3); List<Integer> data1 = Arrays.asList(3,2,12,5,6,1,7); JavaRDD<Integer> javaRDD1 = javaSparkContext.parallelize(data1); JavaPairRDD<Integer,Long> zipWithIndexRDD = javaRDD.zipWithUniqueId(); System.out.println("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~" + zipWithIndexRDD.collect());總結
以上是生活随笔為你收集整理的Spark Java API:Transformation的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Spark Java API:Actio
- 下一篇: 基于用户行为的兴趣标签模型