久久精品国产精品国产精品污,男人扒开添女人下部免费视频,一级国产69式性姿势免费视频,夜鲁夜鲁很鲁在线视频 视频,欧美丰满少妇一区二区三区,国产偷国产偷亚洲高清人乐享,中文 在线 日韩 亚洲 欧美,熟妇人妻无乱码中文字幕真矢织江,一区二区三区人妻制服国产

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程语言 > java >内容正文

java

Spark Java API:Transformation

發布時間:2024/1/17 java 25 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Spark Java API:Transformation 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

mapPartitions


官方文檔描述:

Return a new RDD by applying a function to each partition of this RDD.

mapPartitions函數會對每個分區依次調用分區函數處理,然后將處理的結果(若干個Iterator)生成新的RDDs。?
mapPartitions與map類似,但是如果在映射的過程中需要頻繁創建額外的對象,使用mapPartitions要比map高效的過。比如,將RDD中的所有數據通過JDBC連接寫入數據庫,如果使用map函數,可能要為每一個元素都創建一個connection,這樣開銷很大,如果使用mapPartitions,那么只需要針對每一個分區建立一個connection。

函數原型:

def mapPartitions[U](f: FlatMapFunction[java.util.Iterator[T], U]): JavaRDD[U] def mapPartitions[U](f: FlatMapFunction[java.util.Iterator[T], U],preservesPartitioning: Boolean): JavaRDD[U]

第一個函數是基于第二個函數實現的,使用的是preservesPartitioning為false。而第二個函數我們可以指定preservesPartitioning,preservesPartitioning表示是否保留父RDD的partitioner分區信息;FlatMapFunction中的Iterator是這個rdd的一個分區的所有element組成的Iterator。

實例

List<Integer> data = Arrays.asList(1, 2, 4, 3, 5, 6, 7); //RDD有兩個分區 JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data,2); //計算每個分區的合計 JavaRDD<Integer> mapPartitionsRDD = javaRDD.mapPartitions(new FlatMapFunction<Iterator<Integer>, Integer>() { @Overridepublic Iterable<Integer> call(Iterator<Integer> integerIterator) throws Exception {int isum = 0;while(integerIterator.hasNext())isum += integerIterator.next();LinkedList<Integer> linkedList = new LinkedList<Integer>();linkedList.add(isum);return linkedList; } });System.out.println("mapPartitionsRDD~~~~~~~~~~~~~~~~~~~~~~" + mapPartitionsRDD.collect());

mapPartitionsWithIndex


官方文檔說明:

Return a new RDD by applying a function to each partition of this RDD, while tracking the index of the original partition.

mapPartitionsWithIndex與mapPartition基本相同,只是在處理函數的參數是一個二元元組,元組的第一個元素是當前處理的分區的index,元組的第二個元素是當前處理的分區元素組成的Iterator。

函數原型:

def mapPartitionsWithIndex[R]( f: JFunction2[jl.Integer, java.util.Iterator[T], java.util.Iterator[R]],preservesPartitioning: Boolean = false): JavaRDD[R]

源碼分析:

def mapPartitions[U: ClassTag]( f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): RDD[U] = withScope { val cleanedF = sc.clean(f) new MapPartitionsRDD( this, (context: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(iter), preservesPartitioning)} def mapPartitionsWithIndex[U: ClassTag]( f: (Int, Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false): RDD[U] = withScope { val cleanedF = sc.clean(f) new MapPartitionsRDD( this, (context: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(index, iter), preservesPartitioning)}

從源碼中可以看到其實mapPartition已經獲得了當前處理的分區的index,只是沒有傳入分區處理函數,而mapPartition將其傳入分區處理函數。

實例:

List<Integer> data = Arrays.asList(1, 2, 4, 3, 5, 6, 7); //RDD有兩個分區 JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data,2); //分區index、元素值、元素編號輸出 JavaRDD<String> mapPartitionsWithIndexRDD = javaRDD.mapPartitionsWithIndex(new Function2<Integer, Iterator<Integer>, Iterator<String>>() {@Override public Iterator<String> call(Integer v1, Iterator<Integer> v2) throws Exception { LinkedList<String> linkedList = new LinkedList<String>(); int i = 0; while (v2.hasNext()) linkedList.add(Integer.toString(v1) + "|" + v2.next().toString() + Integer.toString(i++)); return linkedList.iterator(); } },false);System.out.println("mapPartitionsWithIndexRDD~~~~~~~~~~~~~~~~~~~~~~" + mapPartitionsWithIndexRDD.collect(

?

?

sample


官方文檔描述:

Return a sampled subset of this RDD.

返回抽樣的樣本的子集。

函數原型:

  • withReplacement can elements be sampled multiple times (replaced when sampled out)
  • fraction expected size of the sample as a fraction of this RDD’s size
  • without replacement: probability that each element is chosen; fraction must be [0, 1]
  • with replacement: expected number of times each element is chosen; fraction must be >= 0
def sample(withReplacement: Boolean, fraction: Double): JavaRDD[T]
  • withReplacement can elements be sampled multiple times (replaced when sampled out)
  • fraction expected size of the sample as a fraction of this RDD’s size
  • without replacement: probability that each element is chosen; fraction must be [0, 1]
  • with replacement: expected number of times each element is chosen; fraction must be >= 0
  • seed seed for the random number generator
def sample(withReplacement: Boolean, fraction: Double, seed: Long): JavaRDD[T]

第一函數是基于第二個實現的,在第一個函數中seed為Utils.random.nextLong;其中,withReplacement是建立不同的采樣器;fraction為采樣比例;seed為隨機生成器的種子。

源碼分析:

def sample(withReplacement: Boolean, fraction: Double, seed: Long = Utils.random.nextLong): RDD[T] = withScope { require(fraction >= 0.0, "Negative fraction value: " + fraction) if (withReplacement) { new PartitionwiseSampledRDD[T, T](this, new PoissonSampler[T](fraction), true, seed) } else { new PartitionwiseSampledRDD[T, T](this, new BernoulliSampler[T](fraction), true, seed) } }

sample函數中,首先對fraction進行驗證;再次建立PartitionwiseSampledRDD,依據withReplacement的值分別建立柏松采樣器或伯努利采樣器。

實例:

List<Integer> data = Arrays.asList(1, 2, 4, 3, 5, 6, 7); JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data); //false 是伯努利分布 (元素可以多次采樣);0.2 采樣比例;100 隨機數生成器的種子 JavaRDD<Integer> sampleRDD = javaRDD.sample(false,0.2,100); System.out.println("sampleRDD~~~~~~~~~~~~~~~~~~~~~~~~~~" + sampleRDD.collect()); //true 是柏松分布;0.2 采樣比例;100 隨機數生成器的種子 JavaRDD<Integer> sampleRDD1 = javaRDD.sample(false,0.2,100); System.out.println("sampleRDD1~~~~~~~~~~~~~~~~~~~~~~~~~~" + sampleRDD1.collect());

randomSplit


官方文檔描述:

Randomly splits this RDD with the provided weights.

依據所提供的權重對該RDD進行隨機劃分

函數原型:

  • weights for splits, will be normalized if they don’t sum to 1
  • random seed
  • return split RDDs in an array
def randomSplit(weights: Array[Double], seed: Long): Array[JavaRDD[T]]
  • weights for splits, will be normalized if they don’t sum to 1
  • return split RDDs in an array
def randomSplit(weights: Array[Double]): Array[JavaRDD[T]]

源碼分析:

def randomSplit(weights: Array[Double], seed: Long = Utils.random.nextLong): Array[RDD[T]] = withScope { val sum = weights.sum val normalizedCumWeights = weights.map(_ / sum).scanLeft(0.0d)(_ + _) normalizedCumWeights.sliding(2).map { x =>randomSampleWithRange(x(0), x(1), seed) }.toArray }def randomSampleWithRange(lb: Double, ub: Double, seed: Long): RDD[T] = { this.mapPartitionsWithIndex( { (index, partition) => val sampler = new BernoulliCellSampler[T](lb, ub) sampler.setSeed(seed + index) sampler.sample(partition) }, preservesPartitioning = true) }

從源碼中可以看到randomSPlit先是對權重數組進行0-1正則化;再利用randomSampleWithRange函數,對RDD進行劃分;而在該函數中調用mapPartitionsWithIndex(上一節有具體說明),建立伯努利采樣器對RDD進行劃分。

實例:

List<Integer> data = Arrays.asList(1, 2, 4, 3, 5, 6, 7); JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data); double [] weights = {0.1,0.2,0.7}; //依據所提供的權重對該RDD進行隨機劃分 JavaRDD<Integer> [] randomSplitRDDs = javaRDD.randomSplit(weights); System.out.println("randomSplitRDDs of size~~~~~~~~~~~~~~" + randomSplitRDDs.length); int i = 0; for(JavaRDD<Integer> item:randomSplitRDDs) System.out.println(i++ + " randomSplitRDDs of item~~~~~~~~~~~~~~~~" + item.collect());

?

?

union


官方文檔描述:

Return the union of this RDD and another one. Any identical elements will appear multiple times(use`.distinct()` to eliminate them).

函數原型:

def union(other: JavaRDD[T]): JavaRDD[T]

union() 將兩個 RDD 簡單合并在一起,不改變 partition 里面的數據。RangeDependency 實際上也是 1:1,只是為了訪問 union() 后的 RDD 中的 partition 方便,保留了原始 RDD 的 range 邊界。

實例:

List<Integer> data = Arrays.asList(1,2,4,3,5,6,7); JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data); JavaRDD<Integer> unionRDD = javaRDD.union(javaRDD); System.out.println("unionRDD~~~~~~~~~~~~~~~~~~~~~~" + unionRDD.collect());

intersection


官方文檔描述:

Return the intersection of this RDD and another one.The output will not contain any duplicate elements, even if the input RDDs did.Note that this method performs a shuffle internally.

函數原型:

def intersection(other: JavaRDD[T]): JavaRDD[T]

源碼分析:

def intersection(other: RDD[T]): RDD[T] = withScope { this.map(v => (v, null)).cogroup(other.map(v => (v, null))) .filter { case (_, (leftGroup, rightGroup)) => leftGroup.nonEmpty && rightGroup.nonEmpty } .keys }

先使用 map() 將 RDD[T] 轉變成 RDD[(T, null)],這里的 T 只要不是 Array 等集合類型即可。接著,進行 a.cogroup(b)(后面會詳細介紹cogroup)。之后再使用 filter() 過濾掉 [iter(groupA()), iter(groupB())] 中 groupA 或 groupB 為空的 records,得到 FilteredRDD。最后,使用 keys() 只保留 key 即可,得到 MappedRDD。

實例:

List<Integer> data = Arrays.asList(1, 2, 4, 3, 5, 6, 7); JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data); JavaRDD<Integer> intersectionRDD = javaRDD.intersection(javaRDD); System.out.println(intersectionRDD.collect());

?

?

coalesce


官方文檔描述:

Return a new RDD that is reduced into `numPartitions` partitions.

函數原型:

def coalesce(numPartitions: Int): JavaRDD[T]def coalesce(numPartitions: Int, shuffle: Boolean): JavaRDD[T]

源碼分析:

def coalesce(numPartitions: Int, shuffle: Boolean = false)(implicit ord: Ordering[T] = null) : RDD[T] = withScope { if (shuffle) { /** Distributes elements evenly across output partitions, starting from a random partition. */ val distributePartition = (index: Int, items: Iterator[T]) => { var position = (new Random(index)).nextInt(numPartitions) items.map { t => // Note that the hash code of the key will just be the key itself. The HashPartitioner // will mod it with the number of total partitions. position = position + 1 (position, t) } } : Iterator[(Int, T)] // include a shuffle step so that our upstream tasks are still distributed new CoalescedRDD(new ShuffledRDD[Int, T, T](mapPartitionsWithIndex(distributePartition), new HashPartitioner(numPartitions)), numPartitions).values } else { new CoalescedRDD(this, numPartitions) } }

從源碼中可以看出,當shuffle=false時,由于不進行shuffle,問題就變成parent RDD中哪些partition可以合并在一起,合并的過程依據設置的numPartitons中的元素個數進行合并處理。當shuffle=true時,進行shuffle操作,原理很簡單,先是對partition中record進行k-v轉換,其中key是由?(new Random(index)).nextInt(numPartitions)+1計算得到,value為record,index 是該 partition 的索引,numPartitions 是 CoalescedRDD 中的 partition 個數,然后 shuffle 后得到 ShuffledRDD, 可以得到均分的 records,再經過復雜算法來建立 ShuffledRDD 和 CoalescedRDD 之間的數據聯系,最后過濾掉 key,得到 coalesce 后的結果 MappedRDD。

實例:

List<Integer> data = Arrays.asList(1, 2, 4, 3, 5, 6, 7); JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data); // shuffle默認是false JavaRDD<Integer> coalesceRDD = javaRDD.coalesce(2); System.out.println(coalesceRDD);JavaRDD<Integer> coalesceRDD1 = javaRDD.coalesce(2,true); System.out.println(coalesceRDD1);

注意:

coalesce() 可以將 parent RDD 的 partition 個數進行調整,比如從 5 個減少到 3 個,或者從 5 個增加到 10 個。需要注意的是當 shuffle = false 的時候,是不能增加 partition 個數的(即不能從 5 個變為 10 個)。

repartition


官網文檔描述:

Return a new RDD that has exactly numPartitions partitions. Can increase or decrease the level of parallelism in this RDD. Internally, this uses a shuffle to redistribute data. If you are decreasing the number of partitions in this RDD, consider using `coalesce`,which can avoid performing a shuffle.

特別需要說明的是,如果使用repartition對RDD的partition數目進行縮減操作,可以使用coalesce函數,將shuffle設置為false,避免shuffle過程,提高效率。

函數原型:

def repartition(numPartitions: Int): JavaRDD[T]

源碼分析:

def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope { coalesce(numPartitions, shuffle = true) }

從源碼中可以看到repartition等價于 coalesce(numPartitions, shuffle = true)

實例:

List<Integer> data = Arrays.asList(1, 2, 4, 3, 5, 6, 7); JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data); //等價于 coalesce(numPartitions, shuffle = true) JavaRDD<Integer> repartitionRDD = javaRDD.repartition(2); System.out.println(repartitionRDD);

?

?

cartesian


官方文檔描述:

Return the Cartesian product of this RDD and another one, that is, the RDD of all pairs of elements (a, b) where a is in `this` and b is in `other`.

函數原型:

def cartesian[U](other: JavaRDDLike[U, _]): JavaPairRDD[T, U]

源碼分析:

def getPartitions: Array[Partition] = { // create the cross product split val array = new Array[Partition(rdd1.partitions.length * rdd2.partitions.length) for (s1 <- rdd1.partitions; s2 <- rdd2.partitions) { val idx = s1.index * numPartitionsInRdd2 + s2.index array(idx) = new CartesianPartition(idx, rdd1, rdd2, s1.index, s2.index) } array }def getDependencies: Seq[Dependency[_]] = List( new NarrowDependency(rdd1) { def getParents(id: Int): Seq[Int] = List(id / numPartitionsInRdd2) }, new NarrowDependency(rdd2) { def getParents(id: Int): Seq[Int] = List(id % numPartitionsInRdd2) } )

Cartesian 對兩個 RDD 做笛卡爾集,生成的 CartesianRDD 中 partition 個數 =?partitionNum(RDD a) * partitionNum(RDD b)。從getDependencies分析可知,這里的依賴關系與前面的不太一樣,CartesianRDD中每個partition依賴兩個parent RDD,而且其中每個 partition 完全依賴(NarrowDependency) RDD a 中一個 partition,同時又完全依賴(NarrowDependency) RDD b 中另一個 partition。具體如下CartesianRDD 中的 partiton i 依賴于(RDD a).List(i / numPartitionsInRDDb)?和?(RDD b).List(i %numPartitionsInRDDb)。

實例:

List<Integer> data = Arrays.asList(1, 2, 4, 3, 5, 6, 7); JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data);JavaPairRDD<Integer,Integer> cartesianRDD = javaRDD.cartesian(javaRDD); System.out.println(cartesianRDD.collect());

distinct


官方文檔描述:

Return a new RDD containing the distinct elements in this RDD.

函數原型:

def distinct(): JavaRDD[T]def distinct(numPartitions: Int): JavaRDD[T]

第一個函數是基于第二函數實現的,只是numPartitions默認為partitions.length,partitions為parent RDD的分區。

源碼分析:

def distinct(): RDD[T] = withScope { distinct(partitions.length)}def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope { map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1) }

distinct() 功能是 deduplicate RDD 中的所有的重復數據。由于重復數據可能分散在不同的 partition 里面,因此需要 shuffle 來進行 aggregate 后再去重。然而,shuffle 要求數據類型是

實例:

List<Integer> data = Arrays.asList(1, 2, 4, 3, 5, 6, 7, 1, 2); JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data);JavaRDD<Integer> distinctRDD1 = javaRDD.distinct(); System.out.println(distinctRDD1.collect()); JavaRDD<Integer> distinctRDD2 = javaRDD.distinct(2); System.out.println(distinctRDD2.collect());

?

?

aggregate


官方文檔描述:

Aggregate the elements of each partition, and then the results for all the partitions,using given combine functions and a neutral "zero value". This function can return a different result type, U, than the type of this RDD, T. Thus, we need one operation for merging a T into an U and one operation for merging two U's, as in scala.TraversableOnce.Both of these functions are allowed to modify and return their first argument instead of creating a new U to avoid memory allocation.

函數原型:

def aggregate[U](zeroValue: U)(seqOp: JFunction2[U, T, U], combOp: JFunction2[U, U, U]): U

源碼分析:

def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U = withScope { // Clone the zero value since we will also be serializing it as part of tasks var jobResult = Utils.clone(zeroValue, sc.env.serializer.newInstance())val cleanSeqOp = sc.clean(seqOp) val cleanCombOp = sc.clean(combOp) val aggregatePartition = (it: Iterator[T]) => it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp) val mergeResult = (index: Int, taskResult: U) => jobResult = combOp(jobResult, taskResult) sc.runJob(this, aggregatePartition, mergeResult) jobResult }

aggregate函數將每個分區里面的元素進行聚合,然后用combine函數將每個分區的結果和初始值(zeroValue)進行combine操作。這個函數最終返回U的類型不需要和RDD的T中元素類型一致。 這樣,我們需要一個函數將T中元素合并到U中,另一個函數將兩個U進行合并。其中,參數1是初值元素;參數2是seq函數是與初值進行比較;參數3是comb函數是進行合并 。?
注意:如果沒有指定分區,aggregate是計算每個分區的,空值則用初始值替換。

實例:

List<Integer> data = Arrays.asList(5, 1, 1, 4, 4, 2, 2); JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data,3); Integer aggregateValue = javaRDD.aggregate(3, new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer v1, Integer v2) throws Exception { System.out.println("seq~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~" + v1 + "," + v2); return Math.max(v1, v2); } }, new Function2<Integer, Integer, Integer>() { int i = 0; @Override public Integer call(Integer v1, Integer v2) throws Exception { System.out.println("comb~~~~~~~~~i~~~~~~~~~~~~~~~~~~~"+i++); System.out.println("comb~~~~~~~~~v1~~~~~~~~~~~~~~~~~~~" + v1); System.out.println("comb~~~~~~~~~v2~~~~~~~~~~~~~~~~~~~" + v2); return v1 + v2; } }); System.out.println("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~"+aggregateValue);

aggregateByKey


官方文檔描述:

Aggregate the values of each key, using given combine functions and a neutral "zero value".This function can return a different result type, U, than the type of the values in this RDD,V.Thus, we need one operation for merging a V into a U and one operation for merging two U's,as in scala.TraversableOnce. The former operation is used for merging values within a partition, and the latter is used for merging values between partitions. To avoid memory allocation, both of these functions are allowed to modify and return their first argument instead of creating a new U.

函數原型:

def aggregateByKey[U](zeroValue: U, partitioner: Partitioner, seqFunc: JFunction2[U, V, U],combFunc: JFunction2[U, U, U]): JavaPairRDD[K, U] def aggregateByKey[U](zeroValue: U, numPartitions: Int, seqFunc: JFunction2[U, V, U],combFunc: JFunction2[U, U, U]): JavaPairRDD[K, U] def aggregateByKey[U](zeroValue: U, seqFunc: JFunction2[U, V, U], combFunc: JFunction2[U, U, U]): JavaPairRDD[K, U]

源碼分析:

def aggregateByKey[U: ClassTag](zeroValue: U, partitioner: Partitioner)(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)] = self.withScope { // Serialize the zero value to a byte array so that we can get a new clone of it on each key val zeroBuffer = SparkEnv.get.serializer.newInstance().serialize(zeroValue) val zeroArray = new Array[Byte](zeroBuffer.limit) zeroBuffer.get(zeroArray) lazy val cachedSerializer = SparkEnv.get.serializer.newInstance() val createZero = () => cachedSerializer.deserialize[U](ByteBuffer.wrap(zeroArray)) // We will clean the combiner closure later in `combineByKey` val cleanedSeqOp = self.context.clean(seqOp) combineByKey[U]((v: V) => cleanedSeqOp(createZero(), v), cleanedSeqOp, combOp, partitioner) }

aggregateByKey函數對PairRDD中相同Key的值進行聚合操作,在聚合過程中同樣使用了一個中立的初始值。和aggregate函數類似,aggregateByKey返回值的類型不需要和RDD中value的類型一致。因為aggregateByKey是對相同Key中的值進行聚合操作,所以aggregateByKey函數最終返回的類型還是Pair RDD,對應的結果是Key和聚合好的值;而aggregate函數直接是返回非RDD的結果,這點需要注意。在實現過程中,定義了三個aggregateByKey函數原型,但最終調用的aggregateByKey函數都一致。其中,參數zeroValue代表做比較的初始值;參數partitioner代表分區函數;參數seq代表與初始值比較的函數;參數comb是進行合并的方法。

實例:

//將這個測試程序拿文字做一下描述就是:在data數據集中,按key將value進行分組合并, //合并時在seq函數與指定的初始值進行比較,保留大的值;然后在comb中來處理合并的方式。 List<Integer> data = Arrays.asList(5, 1, 1, 4, 4, 2, 2); int numPartitions = 4; JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data); final Random random = new Random(100); JavaPairRDD<Integer,Integer> javaPairRDD = javaRDD.mapToPair(new PairFunction<Integer, Integer, Integer>() { @Override public Tuple2<Integer, Integer> call(Integer integer) throws Exception { return new Tuple2<Integer, Integer>(integer,random.nextInt(10)); } }); System.out.println("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~"+javaPairRDD.collect());JavaPairRDD<Integer, Integer> aggregateByKeyRDD = javaPairRDD.aggregateByKey(3,numPartitions, new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer v1, Integer v2) throws Exception { System.out.println("seq~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~" + v1 + "," + v2); return Math.max(v1, v2); } }, new Function2<Integer, Integer, Integer>() { int i = 0; @Override public Integer call(Integer v1, Integer v2) throws Exception { System.out.println("comb~~~~~~~~~i~~~~~~~~~~~~~~~~~~~" + i++); System.out.println("comb~~~~~~~~~v1~~~~~~~~~~~~~~~~~~~" + v1); System.out.println("comb~~~~~~~~~v2~~~~~~~~~~~~~~~~~~~" + v2); return v1 + v2; } }); System.out.println("aggregateByKeyRDD.partitions().size()~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~"+aggregateByKeyRDD.partitions().size()); System.out.println("aggregateByKeyRDD~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~"+aggregateByKeyRDD.collec

?

?

cogroup


官方文檔描述:

For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the list of values for that key in `this` as well as `other`.

函數原型:

def cogroup[W](other: JavaPairRDD[K, W], partitioner: Partitioner): JavaPairRDD[K, (JIterable[V], JIterable[W])]def cogroup[W1, W2](other1: JavaPairRDD[K, W1], other2: JavaPairRDD[K, W2], partitioner: Partitioner): JavaPairRDD[K, (JIterable[V], JIterable[W1], JIterable[W2])]def cogroup[W1, W2, W3](other1: JavaPairRDD[K, W1], other2: JavaPairRDD[K, W2], other3: JavaPairRDD[K, W3], partitioner: Partitioner): JavaPairRDD[K, (JIterable[V], JIterable[W1], JIterable[W2], JIterable[W3])]def cogroup[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (JIterable[V], JIterable[W])]def cogroup[W1, W2](other1: JavaPairRDD[K, W1], other2: JavaPairRDD[K, W2]): JavaPairRDD[K, (JIterable[V], JIterable[W1], JIterable[W2])]def cogroup[W1, W2, W3](other1: JavaPairRDD[K, W1], other2: JavaPairRDD[K, W2], other3: JavaPairRDD[K, W3]): JavaPairRDD[K, (JIterable[V], JIterable[W1], JIterable[W2], JIterable[W3])]def cogroup[W](other: JavaPairRDD[K, W], numPartitions: Int): JavaPairRDD[K, (JIterable[V], JIterable[W])]def cogroup[W1, W2](other1: JavaPairRDD[K, W1], other2: JavaPairRDD[K, W2], numPartitions: Int): JavaPairRDD[K, (JIterable[V], JIterable[W1], JIterable[W2])]def cogroup[W1, W2, W3](other1: JavaPairRDD[K, W1], other2: JavaPairRDD[K, W2], other3: JavaPairRDD[K, W3], numPartitions: Int): JavaPairRDD[K, (JIterable[V], JIterable[W1], JIterable[W2], JIterable[W3])]

源碼分析:

def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner) : RDD[(K, (Iterable[V], Iterable[W]))] = self.withScope { if (partitioner.isInstanceOf[HashPartitioner] && keyClass.isArray) { throw new SparkException("Default partitioner cannot partition array keys.") } val cg = new CoGroupedRDD[K](Seq(self, other), partitioner) cg.mapValues { case Array(vs, w1s) => (vs.asInstanceOf[Iterable[V]], w1s.asInstanceOf[Iterable[W]]) } }override def getDependencies: Seq[Dependency[_]] = { rdds.map { rdd: RDD[_ <: Product2[K, _]] => if (rdd.partitioner == Some(part)) { logDebug("Adding one-to-one dependency with " + rdd) new OneToOneDependency(rdd) } else { logDebug("Adding shuffle dependency with " + rdd) new ShuffleDependency[K, Any, CoGroupCombiner](rdd, part, serializer) } } } override def getPartitions: Array[Partition] = { val array = new Array[Partition](part.numPartitions) for (i <- 0 until array.length) { // Each CoGroupPartition will have a dependency per contributing RDD array(i) = new CoGroupPartition(i, rdds.zipWithIndex.map { case (rdd, j) => // Assume each RDD contributed a single dependency, and get it dependencies(j) match {case s: ShuffleDependency[_, _, _] => None case _ => Some(new NarrowCoGroupSplitDep(rdd, i, rdd.partitions(i))) } }.toArray) } array }

cogroup() 的計算結果放在 CoGroupedRDD 中哪個 partition 是由用戶設置的 partitioner 確定的(默認是 HashPartitioner)。?
CoGroupedRDD 依賴的所有 RDD 放進數組 rdds[RDD] 中。再次,foreach i,如果 CoGroupedRDD 和 rdds(i) 對應的 RDD 是 OneToOneDependency 關系,那么?Dependecy[i] = new OneToOneDependency(rdd),否則 =?new ShuffleDependency(rdd)。最后,返回與每個 parent RDD 的依賴關系數組 deps[Dependency]。?
Dependency 類中的 getParents(partition id) 負責給出某個 partition 按照該 dependency 所依賴的 parent RDD 中的 partitions: List[Int]。?
getPartitions() 負責給出 RDD 中有多少個 partition,以及每個 partition 如何序列化。

實例:

List<Integer> data = Arrays.asList(1, 2, 4, 3, 5, 6, 7, 1, 2); JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data);JavaPairRDD<Integer,Integer> javaPairRDD = javaRDD.mapToPair(new PairFunction<Integer, Integer, Integer>() { @Override public Tuple2<Integer, Integer> call(Integer integer) throws Exception { return new Tuple2<Integer, Integer>(integer,1); } });//與 groupByKey() 不同,cogroup() 要 aggregate 兩個或兩個以上的 RDD。 JavaPairRDD<Integer,Tuple2<Iterable<Integer>,Iterable<Integer>>> cogroupRDD = javaPairRDD.cogroup(javaPairRDD); System.out.println(cogroupRDD.collect());JavaPairRDD<Integer,Tuple2<Iterable<Integer>,Iterable<Integer>>> cogroupRDD3 = javaPairRDD.cogroup(javaPairRDD, new Partitioner() { @Override public int numPartitions() { return 2; } @Override public int getPartition(Object key) { return (key.toString()).hashCode()%numPartitions();} }); System.out.println(cogroupRDD3);

join


官方文檔描述:

Return an RDD containing all pairs of elements with matching keys in `this` and `other`. Each pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and (k, v2) is in `other`. Performs a hash join across the cluster.

函數原型:

def join[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (V, W)]def join[W](other: JavaPairRDD[K, W], numPartitions: Int): JavaPairRDD[K, (V, W)]def join[W](other: JavaPairRDD[K, W], partitioner: Partitioner): JavaPairRDD[K, (V, W)]

源碼分析:

def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] = self.withScope { this.cogroup(other, partitioner).flatMapValues( pair => for (v <- pair._1.iterator; w <- pair._2.iterator) yield (v, w) ) }

從源碼中可以看出,join() 將兩個 RDD[(K, V)] 按照 SQL 中的 join 方式聚合在一起。與 intersection() 類似,首先進行 cogroup(), 得到

實例:

List<Integer> data = Arrays.asList(1, 2, 4, 3, 5, 6, 7); final Random random = new Random(); JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data); JavaPairRDD<Integer,Integer> javaPairRDD = javaRDD.mapToPair(new PairFunction<Integer, Integer, Integer>() { @Override public Tuple2<Integer, Integer> call(Integer integer) throws Exception { return new Tuple2<Integer, Integer>(integer,random.nextInt(10)); } });JavaPairRDD<Integer,Tuple2<Integer,Integer>> joinRDD = javaPairRDD.join(javaPairRDD); System.out.println(joinRDD.collect());JavaPairRDD<Integer,Tuple2<Integer,Integer>> joinRDD2 = javaPairRDD.join(javaPairRDD,2); System.out.println(joinRDD2.collect());JavaPairRDD<Integer,Tuple2<Integer,Integer>> joinRDD3 = javaPairRDD.join(javaPairRDD, new Partitioner() { @Override public int numPartitions() { return 2; } @Override public int getPartition(Object key) { return (key.toString()).hashCode()%numPartitions();} }); System.out.println(joinRDD3.collect());

?

?

fullOuterJoin


官方文檔描述:

Perform a full outer join of `this` and `other`. For each element (k, v) in `this`, the resulting RDD will either contain all pairs (k, (Some(v), Some(w))) for w in `other`, or the pair (k, (Some(v), None)) if no elements in `other` have key k. Similarly, for each element (k, w) in `other`, the resulting RDD will either contain all pairs (k, (Some(v), Some(w))) for v in `this`, or the pair (k, (None, Some(w))) if no elements in `this` have key k. Uses the given Partitioner to partition the output RDD.

函數原型:

def fullOuterJoin[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (Optional[V], Optional[W])]def fullOuterJoin[W](other: JavaPairRDD[K, W], numPartitions: Int) : JavaPairRDD[K, (Optional[V], Optional[W])]def fullOuterJoin[W](other: JavaPairRDD[K, W], partitioner: Partitioner) : JavaPairRDD[K, (Optional[V], Optional[W])]

源碼分析:

def fullOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner) : RDD[(K, (Option[V], Option[W]))] = self.withScope { this.cogroup(other, partitioner).flatMapValues { case (vs, Seq()) => vs.iterator.map(v => (Some(v), None)) case (Seq(), ws) => ws.iterator.map(w => (None, Some(w))) case (vs, ws) => for (v <- vs.iterator; w <- ws.iterator) yield (Some(v), Some(w)) } }

從源碼中可以看出,fullOuterJoin() 與 join() 類似,首先進行 cogroup(), 得到?<K, (Iterable[V1], Iterable[V2])>?類型的 MappedValuesRDD,然后對 Iterable[V1] 和 Iterable[V2] 做笛卡爾集,注意在V1,V2中添加了None,并將集合 flat() 化。

實例:

List<Integer> data = Arrays.asList(1, 2, 4, 3, 5, 6, 7); final Random random = new Random(); JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data); JavaPairRDD<Integer,Integer> javaPairRDD = javaRDD.mapToPair(new PairFunction<Integer, Integer, Integer>() { @Override public Tuple2<Integer, Integer> call(Integer integer) throws Exception { return new Tuple2<Integer, Integer>(integer,random.nextInt(10)); } });//全關聯 JavaPairRDD<Integer,Tuple2<Optional<Integer>,Optional<Integer>>> fullJoinRDD = javaPairRDD.fullOuterJoin(javaPairRDD); System.out.println(fullJoinRDD);JavaPairRDD<Integer,Tuple2<Optional<Integer>,Optional<Integer>>> fullJoinRDD1 = javaPairRDD.fullOuterJoin(javaPairRDD,2); System.out.println(fullJoinRDD1);JavaPairRDD<Integer,Tuple2<Optional<Integer>,Optional<Integer>>> fullJoinRDD2 = javaPairRDD.fullOuterJoin(javaPairRDD, new Partitioner() { @Override public int numPartitions() { return 2; } @Override public int getPartition(Object key) { return (key.toString()).hashCode()%numPartitions(); } }); System.out.println(fullJoinRDD2);

leftOuterJoin


官方文檔描述:

Perform a left outer join of `this` and `other`. For each element (k, v) in `this`, the resulting RDD will either contain all pairs (k, (v, Some(w))) for w in `other`, or the pair (k, (v, None)) if no elements in `other` have key k. Uses the given Partitioner to partition the output RDD.

函數原型:

def leftOuterJoin[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (V, Optional[W])]def leftOuterJoin[W](other: JavaPairRDD[K, W], numPartitions: Int) : JavaPairRDD[K, (V, Optional[W])]def leftOuterJoin[W](other: JavaPairRDD[K, W], partitioner: Partitioner): JavaPairRDD[K, (V, Optional[W])]

源碼分析:

def leftOuterJoin[W]( other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, Option[W]))] = self.withScope { this.cogroup(other, partitioner).flatMapValues { pair => if (pair._2.isEmpty) { pair._1.iterator.map(v => (v, None)) } else { for (v <- pair._1.iterator; w <- pair._2.iterator) yield (v, Some(w)) } } }

從源碼中可以看出,leftOuterJoin() 與 fullOuterJoin() 類似,首先進行 cogroup(), 得到?<K, (Iterable[V1], Iterable[V2])>類型的 MappedValuesRDD,然后對 Iterable[V1] 和 Iterable[V2] 做笛卡爾集,注意在V1中添加了None,并將集合 flat() 化。

實例:

List<Integer> data = Arrays.asList(1, 2, 4, 3, 5, 6, 7); final Random random = new Random(); JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data); JavaPairRDD<Integer,Integer> javaPairRDD = javaRDD.mapToPair(new PairFunction<Integer, Integer, Integer>() { @Override public Tuple2<Integer, Integer> call(Integer integer) throws Exception { return new Tuple2<Integer, Integer>(integer,random.nextInt(10)); } });//左關聯 JavaPairRDD<Integer,Tuple2<Integer,Optional<Integer>>> leftJoinRDD = javaPairRDD.leftOuterJoin(javaPairRDD); System.out.println(leftJoinRDD);JavaPairRDD<Integer,Tuple2<Integer,Optional<Integer>>> leftJoinRDD1 = javaPairRDD.leftOuterJoin(javaPairRDD,2); System.out.println(leftJoinRDD1);JavaPairRDD<Integer,Tuple2<Integer,Optional<Integer>>> leftJoinRDD2 = javaPairRDD.leftOuterJoin(javaPairRDD, new Partitioner() { @Override public int numPartitions() { return 2; } @Override public int getPartition(Object key) { return (key.toString()).hashCode()%numPartitions(); } }); System.out.println(leftJoinRDD2);

rightOuterJoin


官方文檔描述:

Perform a right outer join of `this` and `other`. For each element (k, w) in `other`, the resulting RDD will either contain all pairs (k, (Some(v), w)) for v in `this`, or the pair (k, (None, w)) if no elements in `this` have key k. Uses the given Partitioner to partition the output RDD.

函數原型:

def rightOuterJoin[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (Optional[V], W)]def rightOuterJoin[W](other: JavaPairRDD[K, W], numPartitions: Int) : JavaPairRDD[K, (Optional[V], W)]def rightOuterJoin[W](other: JavaPairRDD[K, W], partitioner: Partitioner): JavaPairRDD[K, (Optional[V], W)]

源碼分析:

def rightOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner) : RDD[(K, (Option[V], W))] = self.withScope { this.cogroup(other, partitioner).flatMapValues { pair => if (pair._1.isEmpty) { pair._2.iterator.map(w => (None, w)) } else { for (v <- pair._1.iterator; w <- pair._2.iterator) yield (Some(v), w) } } }

從源碼中可以看出,rightOuterJoin() 與 fullOuterJoin() 類似,首先進行 cogroup(), 得到?<K, (Iterable[V1], Iterable[V2])>?類型的 MappedValuesRDD,然后對 Iterable[V1] 和 Iterable[V2] 做笛卡爾集,注意在V2中添加了None,并將集合 flat() 化。

實例:

List<Integer> data = Arrays.asList(1, 2, 4, 3, 5, 6, 7); final Random random = new Random(); JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data); JavaPairRDD<Integer,Integer> javaPairRDD = javaRDD.mapToPair(new PairFunction<Integer, Integer, Integer>() { @Override public Tuple2<Integer, Integer> call(Integer integer) throws Exception { return new Tuple2<Integer, Integer>(integer,random.nextInt(10)); } });//右關聯 JavaPairRDD<Integer,Tuple2<Optional<Integer>,Integer>> rightJoinRDD = javaPairRDD.rightOuterJoin(javaPairRDD); System.out.println(rightJoinRDD);JavaPairRDD<Integer,Tuple2<Optional<Integer>,Integer>> rightJoinRDD1 = javaPairRDD.rightOuterJoin(javaPairRDD,2); System.out.println(rightJoinRDD1);JavaPairRDD<Integer,Tuple2<Optional<Integer>,Integer>> rightJoinRDD2 = javaPairRDD.rightOuterJoin(javaPairRDD, new Partitioner() { @Override public int numPartitions() { return 2; } @Override public int getPartition(Object key) { return (key.toString()).hashCode()%numPartitions(); } }); System.out.println(rightJoinRDD2);

?

?

sortByKey


官方文檔描述:

Sort the RDD by key, so that each partition contains a sorted range of the elements in ascending order. Calling `collect` or `save` on the resulting RDD will return or output an ordered list of records (in the `save` case, they will be written to multiple `part-X` files in the filesystem, in order of the keys).

函數原型:

def sortByKey(): JavaPairRDD[K, V]def sortByKey(ascending: Boolean): JavaPairRDD[K, V]def sortByKey(ascending: Boolean, numPartitions: Int): JavaPairRDD[K, V]def sortByKey(comp: Comparator[K]): JavaPairRDD[K, V]def sortByKey(comp: Comparator[K], ascending: Boolean): JavaPairRDD[K, V]def sortByKey(comp: Comparator[K], ascending: Boolean, numPartitions: Int): JavaPairRDD[K, V]

源碼分析:

def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length) : RDD[(K, V)] = self.withScope{ val part = new RangePartitioner(numPartitions, self, ascending) new ShuffledRDD[K, V, V](self, part) .setKeyOrdering(if (ascending) ordering else ordering.reverse) }

sortByKey() 將 RDD[(K, V)] 中的 records 按 key 排序,ascending = true 表示升序,false 表示降序。目前 sortByKey() 的數據依賴很簡單,先使用 shuffle 將 records 聚集在一起(放到對應的 partition 里面),然后將 partition 內的所有 records 按 key 排序,最后得到的 MapPartitionsRDD 中的 records 就有序了。目前 sortByKey() 先使用 Array 來保存 partition 中所有的 records,再排序。

實例:

List<Integer> data = Arrays.asList(1, 2, 4, 3, 5, 6, 7); JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data); final Random random = new Random(100); JavaPairRDD<Integer,Integer> javaPairRDD = javaRDD.mapToPair(new PairFunction<Integer, Integer, Integer>() { @Override public Tuple2<Integer, Integer> call(Integer integer) throws Exception { return new Tuple2<Integer, Integer>(integer,random.nextInt(10)); } });JavaPairRDD<Integer,Integer> sortByKeyRDD = javaPairRDD.sortByKey(); System.out.println(sortByKeyRDD.collect());

repartitionAndSortWithinPartitions


官方文檔描述:

Repartition the RDD according to the given partitioner and, within each resulting partition,sort records by their keys.This is more efficient than calling `repartition` and then sorting within each partition because it can push the sorting down into the shuffle machinery.

函數原型:

def repartitionAndSortWithinPartitions(partitioner: Partitioner): JavaPairRDD[K, V]def repartitionAndSortWithinPartitions(partitioner: Partitioner, comp: Comparator[K]) : JavaPairRDD[K, V]

源碼分析:

def repartitionAndSortWithinPartitions(partitioner: Partitioner): RDD[(K, V)] = self.withScope { new ShuffledRDD[K, V, V](self, partitioner).setKeyOrdering(ordering) }

從源碼中可以看出,該方法依據partitioner對RDD進行分區,并且在每個結果分區中按key進行排序;通過對比sortByKey發現,這種方式比先分區,然后在每個分區中進行排序效率高,這是因為它可以將排序融入到shuffle階段。

實例:

List<Integer> data = Arrays.asList(1, 2, 4, 3, 5, 6, 7); JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data); final Random random = new Random();JavaPairRDD<Integer,Integer> javaPairRDD = javaRDD.mapToPair(new PairFunction<Integer, Integer, Integer>() { @Override public Tuple2<Integer, Integer> call(Integer integer) throws Exception { return new Tuple2<Integer, Integer>(integer,random.nextInt(10)); } });JavaPairRDD<Integer,Integer> RepartitionAndSortWithPartitionsRDD = javaPairRDD.repartitionAndSortWithinPartitions(new Partitioner() { @Override public int numPartitions() { return 2; } @Override public int getPartition(Object key) { return key.toString().hashCode() % numPartitions(); } }); System.out.println(RepartitionAndSortWithPartitionsRDD.collect());

?

?

combineByKey


官方文檔描述:

Generic function to combine the elements for each key using a custom set of aggregation functions. Turns an RDD[(K, V)] into a result of type RDD[(K, C)], for a "combined type" C Note that V and C can be different -- for example, one might group an RDD of type (Int, Int) into an RDD of type (Int, Seq[Int]). Users provide three functions:- `createCombiner`, which turns a V into a C (e.g., creates a one-element list)- `mergeValue`, to merge a V into a C (e.g., adds it to the end of a list)- `mergeCombiners`, to combine two C's into a single one. In addition, users can control the partitioning of the output RDD, and whether to perform map-side aggregation (if a mapper can produce multiple items with the same key).

函數原型:

def combineByKey[C](createCombiner: JFunction[V, C], mergeValue: JFunction2[C, V, C], mergeCombiners: JFunction2[C, C, C]): JavaPairRDD[K, C]def combineByKey[C](createCombiner: JFunction[V, C], mergeValue: JFunction2[C, V, C], mergeCombiners: JFunction2[C, C, C], numPartitions: Int): JavaPairRDD[K, C]def combineByKey[C](createCombiner: JFunction[V, C], mergeValue: JFunction2[C, V, C], mergeCombiners: JFunction2[C, C, C], partitioner: Partitioner): JavaPairRDD[K, C]def combineByKey[C](createCombiner: JFunction[V, C], mergeValue: JFunction2[C, V, C], mergeCombiners: JFunction2[C, C, C], partitioner: Partitioner, mapSideCombine: Boolean,serializer: Serializer): JavaPairRDD[K, C]

該函數是用于將RDD[k,v]轉化為RDD[k,c],其中類型v和類型c可以相同也可以不同。?
其中的參數如下:?
- createCombiner:該函數用于將輸入參數RDD[k,v]的類型V轉化為輸出參數RDD[k,c]中類型C;?
- mergeValue:合并函數,用于將輸入中的類型C的值和類型V的值進行合并,得到類型C,輸入參數是(C,V),輸出是C;?
- mergeCombiners:合并函數,用于將兩個類型C的值合并成一個類型C,輸入參數是(C,C),輸出是C;?
- numPartitions:默認HashPartitioner中partition的個數;?
- partitioner:分區函數,默認是HashPartitionner;?
- mapSideCombine:該函數用于判斷是否需要在map進行combine操作,類似于MapReduce中的combine,默認是 true。

源碼分析:

def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, partitioner: Partitioner, mapSideCombine: Boolean = true, serializer: Serializer = null): RDD[(K, C)] = self.withScope { require(mergeCombiners != null, "mergeCombiners must be defined") // required as of Spark 0.9.0 if (keyClass.isArray) { if (mapSideCombine) { throw new SparkException("Cannot use map-side combining with array keys.") } if (partitioner.isInstanceOf[HashPartitioner]) { throw new SparkException("Default partitioner cannot partition array keys.") } } val aggregator = new Aggregator[K, V, C]( self.context.clean(createCombiner), self.context.clean(mergeValue), self.context.clean(mergeCombiners)) if (self.partitioner == Some(partitioner)) { self.mapPartitions(iter => { val context = TaskContext.get() new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context)) }, preservesPartitioning = true) } else { new ShuffledRDD[K, V, C](self, partitioner) .setSerializer(serializer) .setAggregator(aggregator) .setMapSideCombine(mapSideCombine) } }

從源碼中可以看出,combineByKey()的實現是一邊進行aggregate,一邊進行compute() 的基礎操作。假設一組具有相同 K 的?<K, V>?records 正在一個個流向 combineByKey(),createCombiner 將第一個 record 的 value 初始化為 c (比如,c = value),然后從第二個 record 開始,來一個 record 就使用 mergeValue(c, record.value) 來更新 c,比如想要對這些 records 的所有 values 做 sum,那么使用?c = c + record.value。等到 records 全部被 mergeValue(),得到結果 c。假設還有一組 records(key 與前面那組的 key 均相同)一個個到來,combineByKey() 使用前面的方法不斷計算得到 c’。現在如果要求這兩組 records 總的 combineByKey() 后的結果,那么可以使用?final c = mergeCombiners(c, c')?來計算;然后依據partitioner進行不同分區合并。

實例:

List<Integer> data = Arrays.asList(1, 2, 4, 3, 5, 6, 7, 1, 2); JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data); //轉化為pairRDD JavaPairRDD<Integer,Integer> javaPairRDD = javaRDD.mapToPair(new PairFunction<Integer, Integer, Integer>() { @Override public Tuple2<Integer, Integer> call(Integer integer) throws Exception { return new Tuple2<Integer, Integer>(integer,1); } });JavaPairRDD<Integer,String> combineByKeyRDD = javaPairRDD.combineByKey(new Function<Integer, String>() { @Override public String call(Integer v1) throws Exception { return v1 + " :createCombiner: "; }}, new Function2<String, Integer, String>() { @Override public String call(String v1, Integer v2) throws Exception { return v1 + " :mergeValue: " + v2; } }, new Function2<String, String, String>() { @Override public String call(String v1, String v2) throws Exception { return v1 + " :mergeCombiners: " + v2; } }); System.out.println("result~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~" + combineByKeyRDD.collect());

groupByKey


官方文檔描述:

Group the values for each key in the RDD into a single sequence. Allows controlling the partitioning of the resulting key-value pair RDD by passing a Partitioner. The ordering of elements within each group is not guaranteed,and may even differ each time the resulting RDD is evaluated.Note: This operation may be very expensive. If you are grouping in order to perform an aggregation (such as a sum or average) over each key, using [[PairRDDFunctions.aggregateByKey]] or [[PairRDDFunctions.reduceByKey]] will provide much better performance.Note: As currently implemented, groupByKey must be able to hold all the key-value pairs for any key in memory. If a key has too many values, it can result in an [[OutOfMemoryError]].

函數原型:

def groupByKey(partitioner: Partitioner): JavaPairRDD[K, JIterable[V]]def groupByKey(numPartitions: Int): JavaPairRDD[K, JIterable[V]]

源碼分析:

def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] = self.withScope { // groupByKey shouldn't use map side combine because map side combine does not // reduce the amount of data shuffled and requires all map side data be inserted // into a hash table, leading to more objects in the old gen. val createCombiner = (v: V) => CompactBuffer(v) val mergeValue = (buf: CompactBuffer[V], v: V) => buf += v val mergeCombiners = (c1: CompactBuffer[V], c2: CompactBuffer[V]) => c1 ++= c2 val bufs = combineByKey[CompactBuffer[V]]( createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine = false) bufs.asInstanceOf[RDD[(K, Iterable[V])]] }

從源碼中可以看出groupByKey()是基于combineByKey()實現的, 只是將 Key 相同的 records 聚合在一起,一個簡單的 shuffle 過程就可以完成。ShuffledRDD 中的 compute() 只負責將屬于每個 partition 的數據 fetch 過來,之后使用 mapPartitions() 操作進行 aggregate,生成 MapPartitionsRDD,到這里 groupByKey() 已經結束。最后為了統一返回值接口,將 value 中的 ArrayBuffer[] 數據結構抽象化成 Iterable[]。groupByKey() 沒有在 map 端進行 combine(mapSideCombine = false),這樣設計是因為map 端 combine 只會省掉 partition 里面重復 key 占用的空間;但是,當重復 key 特別多時,可以考慮開啟 combine。

實例:

List<Integer> data = Arrays.asList(1, 2, 4, 3, 5, 6, 7); JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data); //轉為k,v格式 JavaPairRDD<Integer,Integer> javaPairRDD = javaRDD.mapToPair(new PairFunction<Integer, Integer, Integer>() { @Override public Tuple2<Integer, Integer> call(Integer integer) throws Exception { return new Tuple2<Integer, Integer>(integer,1); } });JavaPairRDD<Integer,Iterable<Integer>> groupByKeyRDD = javaPairRDD.groupByKey(2); System.out.println(groupByKeyRDD.collect());//自定義partition JavaPairRDD<Integer,Iterable<Integer>> groupByKeyRDD3 = javaPairRDD.groupByKey(new Partitioner() { //partition各數 @Override public int numPartitions() { return 10; } //partition方式 @Override public int getPartition(Object o) { return (o.toString()).hashCode()%numPartitions(); } }); System.out.println(groupByKeyRDD3.collect());

?

?

reduceByKey


官方文檔描述:

Merge the values for each key using an associative reduce function. This will also perform the merging locally on each mapper before sending results to a reducer, similarly to a "combiner" in MapReduce.

函數原型:

def reduceByKey(partitioner: Partitioner, func: JFunction2[V, V, V]): JavaPairRDD[K, V]def reduceByKey(func: JFunction2[V, V, V], numPartitions: Int): JavaPairRDD[K, V]

該函數利用映射函數將每個K對應的V進行運算。?
其中參數說明如下:?
- func:映射函數,根據需求自定義;?
- partitioner:分區函數;?
- numPartitions:分區數,默認的分區函數是HashPartitioner。

源碼分析:

def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = self.withScope { combineByKey[V]((v: V) => v, func, func, partitioner) }

從源碼中可以看出,reduceByKey()是基于combineByKey()實現的,其中createCombiner只是簡單的轉化,而mergeValue和mergeCombiners相同,都是利用用戶自定義函數。reduceyByKey() 相當于傳統的 MapReduce,整個數據流也與 Hadoop 中的數據流基本一樣。在combineByKey()中在 map 端開啟 combine(),因此,reduceyByKey() 默認也在 map 端開啟 combine(),這樣在 shuffle 之前先通過 mapPartitions 操作進行 combine,得到 MapPartitionsRDD, 然后 shuffle 得到 ShuffledRDD,再進行 reduce(通過 aggregate + mapPartitions() 操作來實現)得到 MapPartitionsRDD。

實例:

List<Integer> data = Arrays.asList(1, 2, 4, 3, 5, 6, 7); JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data);//轉化為K,V格式 JavaPairRDD<Integer,Integer> javaPairRDD = javaRDD.mapToPair(new PairFunction<Integer, Integer, Integer>() { @Override public Tuple2<Integer, Integer> call(Integer integer) throws Exception { return new Tuple2<Integer, Integer>(integer,1); } }); JavaPairRDD<Integer,Integer> reduceByKeyRDD = javaPairRDD.reduceByKey(new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer v1, Integer v2) throws Exception { return v1 + v2; } }); System.out.println(reduceByKeyRDD.collect());//指定numPartitions JavaPairRDD<Integer,Integer> reduceByKeyRDD2 = javaPairRDD.reduceByKey(new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer v1, Integer v2) throws Exception { return v1 + v2; } },2); System.out.println(reduceByKeyRDD2.collect());//自定義partition JavaPairRDD<Integer,Integer> reduceByKeyRDD4 = javaPairRDD.reduceByKey(new Partitioner() { @Override public int numPartitions() { return 2; } @Override public int getPartition(Object o) { return (o.toString()).hashCode()%numPartitions(); } }, new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer v1, Integer v2) throws Exception { return v1 + v2; } }); System.out.println(reduceByKeyRDD4.collect());

foldByKey


官方文檔描述:

Merge the values for each key using an associative function and a neutral "zero value" which may be added to the result an arbitrary number of times, and must not change the result (e.g., Nil for list concatenation, 0 for addition, or 1 for multiplication.).

函數原型:

def foldByKey(zeroValue: V, partitioner: Partitioner, func: JFunction2[V, V, V]): JavaPairRDD[K, V]def foldByKey(zeroValue: V, numPartitions: Int, func: JFunction2[V, V, V]): JavaPairRDD[K, V]def foldByKey(zeroValue: V, func: JFunction2[V, V, V]): JavaPairRDD[K, V]

該函數用于將K對應V利用函數映射進行折疊、合并處理,其中參數zeroValue是對V進行初始化。?
具體參數如下:?
- zeroValue:初始值;?
- numPartitions:分區數,默認的分區函數是HashPartitioner;?
- partitioner:分區函數;?
- func:映射函數,用戶自定義函數。

源碼分析:

def foldByKey( zeroValue: V, partitioner: Partitioner)(func: (V, V) => V): RDD[(K, V)] = self.withScope { // Serialize the zero value to a byte array so that we can get a new clone of it on each key val zeroBuffer = SparkEnv.get.serializer.newInstance().serialize(zeroValue) val zeroArray = new Array[Byte](zeroBuffer.limit) zeroBuffer.get(zeroArray) // When deserializing, use a lazy val to create just one instance of the serializer per task lazy val cachedSerializer = SparkEnv.get.serializer.newInstance() val createZero = () => cachedSerializer.deserialize[V](ByteBuffer.wrap(zeroArray)) val cleanedFunc = self.context.clean(func) combineByKey[V]((v: V) => cleanedFunc(createZero(), v), cleanedFunc, cleanedFunc, partitioner) }

從foldByKey()實現可以看出,該函數是基于combineByKey()實現的,其中createCombiner只是利用zeroValue對V進行初始化,而mergeValue和mergeCombiners相同,都是利用用戶自定義函數。在這里需要注意如果實現K的V聚合操作,初始設置需要特別注意,不要改變聚合的結果。

實例:

List<Integer> data = Arrays.asList(1, 2, 4, 3, 5, 6, 7, 1, 2); JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data); final Random rand = new Random(10); JavaPairRDD<Integer,String> javaPairRDD = javaRDD.mapToPair(new PairFunction<Integer, Integer, String>() { @Override public Tuple2<Integer, String> call(Integer integer) throws Exception { return new Tuple2<Integer, String>(integer,Integer.toString(rand.nextInt(10))); } });JavaPairRDD<Integer,String> foldByKeyRDD = javaPairRDD.foldByKey("X", new Function2<String, String, String>() { @Override public String call(String v1, String v2) throws Exception { return v1 + ":" + v2; } }); System.out.println("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~" + foldByKeyRDD.collect());JavaPairRDD<Integer,String> foldByKeyRDD1 = javaPairRDD.foldByKey("X", 2, new Function2<String, String, String>() { @Override public String call(String v1, String v2) throws Exception { return v1 + ":" + v2; } }); System.out.println("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~" + foldByKeyRDD1.collect());JavaPairRDD<Integer,String> foldByKeyRDD2 = javaPairRDD.foldByKey("X", new Partitioner() { @Override public int numPartitions() { return 3; } @Override public int getPartition(Object key) { return key.toString().hashCode()%numPartitions(); } }, new Function2<String, String, String>() { @Override public String call(String v1, String v2) throws Exception { return v1 + ":" + v2; } }); System.out.println("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~" + foldByKeyRDD2.collect());

?

?

zipPartitions


官方文檔描述:

Zip this RDD's partitions with one (or more) RDD(s) and return a new RDD by applying a function to the zipped partitions. Assumes that all the RDDs have the same number of partitions, but does not require them to have the same number of elements in each partition.

函數原型:

def zipPartitions[U, V]( other: JavaRDDLike[U, _], f: FlatMapFunction2[java.util.Iterator[T], java.util.Iterator[U], V]): JavaRDD[V]

該函數將兩個分區RDD按照partition進行合并,形成一個新的RDD。

源碼分析:

def zipPartitions[B: ClassTag, V: ClassTag] (rdd2: RDD[B], preservesPartitioning: Boolean) (f: (Iterator[T], Iterator[B]) => Iterator[V]): RDD[V] = withScope { new ZippedPartitionsRDD2(sc, sc.clean(f), this, rdd2, preservesPartitioning) }private[spark] class ZippedPartitionsRDD2[A: ClassTag, B: ClassTag, V: ClassTag]( sc: SparkContext, var f: (Iterator[A], Iterator[B]) => Iterator[V], var rdd1: RDD[A], var rdd2: RDD[B], preservesPartitioning: Boolean = false) extends ZippedPartitionsBaseRDD[V](sc, List(rdd1, rdd2), preservesPartitioning) { override def compute(s: Partition, context: TaskContext): Iterator[V] = { val partitions = s.asInstanceOf[ZippedPartitionsPartition].partitions f(rdd1.iterator(partitions(0), context), rdd2.iterator(partitions(1), context)) } override def clearDependencies() { super.clearDependencies() rdd1 = null rdd2 = null f = null } }

從源碼中可以看出,zipPartitions函數生成ZippedPartitionsRDD2,該RDD繼承ZippedPartitionsBaseRDD,在ZippedPartitionsBaseRDD中的getPartitions方法中判斷需要組合的RDD是否具有相同的分區數,但是該RDD實現中并沒有要求每個partitioner內的元素數量相同。

實例:

List<Integer> data = Arrays.asList(5, 1, 1, 4, 4, 2, 2); JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data,3); List<Integer> data1 = Arrays.asList(3, 2, 12, 5, 6, 1); JavaRDD<Integer> javaRDD1 = javaSparkContext.parallelize(data1,3); JavaRDD<String> zipPartitionsRDD = javaRDD.zipPartitions(javaRDD1, new FlatMapFunction2<Iterator<Integer>, Iterator<Integer>, String>() { @Override public Iterable<String> call(Iterator<Integer> integerIterator, Iterator<Integer> integerIterator2) throws Exception { LinkedList<String> linkedList = new LinkedList<String>(); while(integerIterator.hasNext() && integerIterator2.hasNext()) linkedList.add(integerIterator.next().toString() + "_" + integerIterator2.next().toString()); return linkedList; } }); System.out.println("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~" + zipPartitionsRDD.collect());

zip


官方文檔描述:

Zips this RDD with another one, returning key-value pairs with the first element in each RDD,second element in each RDD, etc. Assumes that the two RDDs have the same number of partitions and the same number of elements in each partition (e.g. one was made through a map on the other).

函數原型:

def zip[U](other: JavaRDDLike[U, _]): JavaPairRDD[T, U]

該函數用于將兩個RDD進行組合,組合成一個key/value形式的RDD。

源碼分析:

def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)] = withScope { zipPartitions(other, preservesPartitioning = false) { (thisIter, otherIter) => new Iterator[(T, U)] { def hasNext: Boolean = (thisIter.hasNext, otherIter.hasNext) match { case (true, true) => true case (false, false) => false case _ => throw new SparkException("Can only zip RDDs with " + "same number of elements in each partition") } def next(): (T, U) = (thisIter.next(), otherIter.next()) } } }

從源碼中可以看出,zip函數是基于zipPartitions實現的,其中preservesPartitioning為false,preservesPartitioning表示是否保留父RDD的partitioner分區;另外,兩個RDD的partition數量及元數的數量都是相同的,否則會拋出異常。

實例:

List<Integer> data = Arrays.asList(5, 1, 1, 4, 4, 2, 2); JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data,3); List<Integer> data1 = Arrays.asList(3,2,12,5,6,1,7); JavaRDD<Integer> javaRDD1 = javaSparkContext.parallelize(data1); JavaPairRDD<Integer,Integer> zipRDD = javaRDD.zip(javaRDD1); System.out.println("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~" + zipRDD.collect());

?

?

zipWithIndex


官方文檔描述:

Zips this RDD with its element indices. The ordering is first based on the partition index and then the ordering of items within each partition. So the first item in the first partition gets index 0, and the last item in the last partition receives the largest index. This is similar to Scala's zipWithIndex but it uses Long instead of Int as the index type.This method needs to trigger a spark job when this RDD contains more than one partitions.

函數原型:

def zipWithIndex(): JavaPairRDD[T, JLong]

該函數將RDD中的元素和這個元素在RDD中的indices組合起來,形成鍵/值對的RDD。

源碼分析:

def zipWithIndex(): RDD[(T, Long)] = withScope { new ZippedWithIndexRDD(this) }/** The start index of each partition. */ @transient private val startIndices: Array[Long] = { val n = prev.partitions.length if (n == 0) { Array[Long]() } else if (n == 1) { Array(0L) } else { prev.context.runJob( prev, Utils.getIteratorSize _, 0 until n - 1, // do not need to count the last partition allowLocal = false ).scanLeft(0L)(_ + _) } }override def compute(splitIn: Partition, context: TaskContext): Iterator[(T, Long)] = { val split = splitIn.asInstanceOf[ZippedWithIndexRDDPartition] firstParent[T].iterator(split.prev, context).zipWithIndex.map { x => (x._1, split.startIndex + x._2) } }

從源碼中可以看出,該函數返回ZippedWithIndexRDD,在ZippedWithIndexRDD中通過計算startIndices獲得index;然后在compute函數中利用scala的zipWithIndex計算index。

實例:

List<Integer> data = Arrays.asList(5, 1, 1, 4, 4, 2, 2); JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data,3); List<Integer> data1 = Arrays.asList(3,2,12,5,6,1,7); JavaRDD<Integer> javaRDD1 = javaSparkContext.parallelize(data1); JavaPairRDD<Integer,Long> zipWithIndexRDD = javaRDD.zipWithIndex(); System.out.println("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~" + zipWithIndexRDD.collect());

zipWithUniqueId


官方文檔描述:

Zips this RDD with generated unique Long ids. Items in the kth partition will get ids k, n+k,2*n+k, ..., where n is the number of partitions. So there may exist gaps, but this method won't trigger a spark job, which is different from [[org.apache.spark.rdd.RDD#zipWithIndex]].

函數原型:

def zipWithUniqueId(): JavaPairRDD[T, JLong]

該函數將RDD中的元素和一個對應的唯一ID組合成鍵值對,其中ID的生成算法是每個分區的第一元素的ID是該分區索引號,每個分區中的第N個元素的ID是(N * 該RDD總的分區數) + (該分區索引號)。

源碼分析:

def zipWithUniqueId(): RDD[(T, Long)] = withScope { val n = this.partitions.length.toLong this.mapPartitionsWithIndex { case (k, iter) => iter.zipWithIndex.map { case (item, i) => (item, i * n + k) } } }

*從源碼中可以看出,zipWithUniqueId()函數是利用mapPartitionsWithIndex()函數獲得每個元素的分區索引號,同時利用(i*n + k)進行相應的計算。

實例:

List<Integer> data = Arrays.asList(5, 1, 1, 4, 4, 2, 2); JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data,3); List<Integer> data1 = Arrays.asList(3,2,12,5,6,1,7); JavaRDD<Integer> javaRDD1 = javaSparkContext.parallelize(data1); JavaPairRDD<Integer,Long> zipWithIndexRDD = javaRDD.zipWithUniqueId(); System.out.println("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~" + zipWithIndexRDD.collect());

總結

以上是生活随笔為你收集整理的Spark Java API:Transformation的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。

亚洲男人av香蕉爽爽爽爽 | 国产综合久久久久鬼色 | 中文字幕无码免费久久99 | 国产精品鲁鲁鲁 | 最近中文2019字幕第二页 | 精品国产国产综合精品 | 欧美人与禽猛交狂配 | 国产激情一区二区三区 | 色综合久久中文娱乐网 | 高清不卡一区二区三区 | 国产绳艺sm调教室论坛 | 国产亚洲精品久久久久久久久动漫 | av无码电影一区二区三区 | 又紧又大又爽精品一区二区 | 日韩欧美中文字幕在线三区 | 午夜理论片yy44880影院 | 亚洲色成人中文字幕网站 | 国产麻豆精品精东影业av网站 | 国产婷婷色一区二区三区在线 | 伊人久久婷婷五月综合97色 | 日本一卡2卡3卡4卡无卡免费网站 国产一区二区三区影院 | 日本一本二本三区免费 | 啦啦啦www在线观看免费视频 | 国产明星裸体无码xxxx视频 | 亚洲一区二区三区 | 成人aaa片一区国产精品 | 久久久婷婷五月亚洲97号色 | 久久精品99久久香蕉国产色戒 | 国产综合色产在线精品 | 漂亮人妻洗澡被公强 日日躁 | 久久久久99精品成人片 | 日本精品高清一区二区 | 国产明星裸体无码xxxx视频 | 九九久久精品国产免费看小说 | 色欲人妻aaaaaaa无码 | 免费视频欧美无人区码 | 熟妇人妻无乱码中文字幕 | 亚洲成在人网站无码天堂 | 亚洲精品久久久久久一区二区 | 18黄暴禁片在线观看 | 国产亲子乱弄免费视频 | 亚洲男人av天堂午夜在 | 亚洲一区二区观看播放 | 久久精品国产大片免费观看 | 中文字幕日产无线码一区 | 日本成熟视频免费视频 | 免费人成在线视频无码 | 久久久久成人精品免费播放动漫 | 久久国内精品自在自线 | 国产午夜福利亚洲第一 | 国产精品亚洲а∨无码播放麻豆 | 婷婷六月久久综合丁香 | 亚洲一区二区三区无码久久 | 国产精品国产自线拍免费软件 | 波多野结衣高清一区二区三区 | 300部国产真实乱 | 国产偷自视频区视频 | 亚洲精品国产精品乱码视色 | 久久99精品国产麻豆蜜芽 | 在线a亚洲视频播放在线观看 | 国色天香社区在线视频 | 国产真人无遮挡作爱免费视频 | 久久国产劲爆∧v内射 | 成在人线av无码免费 | 国产精品-区区久久久狼 | 亚洲毛片av日韩av无码 | 国产无套内射久久久国产 | 久久午夜无码鲁丝片 | 少妇人妻偷人精品无码视频 | 中文毛片无遮挡高清免费 | 久久久无码中文字幕久... | 国产精品国产三级国产专播 | 性生交片免费无码看人 | 国产美女精品一区二区三区 | 无码乱肉视频免费大全合集 | 给我免费的视频在线观看 | 国产激情一区二区三区 | 亚洲精品鲁一鲁一区二区三区 | 国产精品亚洲一区二区三区喷水 | 国产日产欧产精品精品app | 97精品人妻一区二区三区香蕉 | 久久久中文字幕日本无吗 | 蜜桃无码一区二区三区 | 国产在线无码精品电影网 | av香港经典三级级 在线 | 国产精品无码mv在线观看 | 亚洲色成人中文字幕网站 | 强伦人妻一区二区三区视频18 | 久久久久亚洲精品男人的天堂 | 内射白嫩少妇超碰 | 日本va欧美va欧美va精品 | 天堂无码人妻精品一区二区三区 | 亚洲一区二区三区偷拍女厕 | 青草视频在线播放 | 日韩精品无码一本二本三本色 | 国产亚洲精品久久久久久大师 | 国产在线精品一区二区高清不卡 | 国产sm调教视频在线观看 | 欧美性生交活xxxxxdddd | 色婷婷av一区二区三区之红樱桃 | 日本va欧美va欧美va精品 | 色婷婷综合中文久久一本 | 无码人妻久久一区二区三区不卡 | 中文字幕+乱码+中文字幕一区 | 九九久久精品国产免费看小说 | 久久亚洲a片com人成 | 国产乱人伦av在线无码 | 精品国精品国产自在久国产87 | 国产乱人伦偷精品视频 | 久久综合给合久久狠狠狠97色 | 巨爆乳无码视频在线观看 | 亚洲天堂2017无码中文 | 亚洲国产精品久久久久久 | 中文字幕乱码中文乱码51精品 | 国产肉丝袜在线观看 | 精品国产av色一区二区深夜久久 | 狠狠躁日日躁夜夜躁2020 | 亚洲精品一区三区三区在线观看 | 无码任你躁久久久久久久 | 久久午夜夜伦鲁鲁片无码免费 | av在线亚洲欧洲日产一区二区 | 黄网在线观看免费网站 | 18黄暴禁片在线观看 | 亚洲日韩av一区二区三区中文 | 黑人巨大精品欧美一区二区 | 学生妹亚洲一区二区 | 久久久久亚洲精品男人的天堂 | 大乳丰满人妻中文字幕日本 | 亚洲综合另类小说色区 | 午夜无码人妻av大片色欲 | 久久精品中文字幕一区 | 亚洲欧美国产精品久久 | 偷窥村妇洗澡毛毛多 | 久久久精品欧美一区二区免费 | 2020久久超碰国产精品最新 | 又大又紧又粉嫩18p少妇 | 高清无码午夜福利视频 | 亚洲色大成网站www | 亚洲人成网站色7799 | 色婷婷综合激情综在线播放 | aⅴ亚洲 日韩 色 图网站 播放 | 真人与拘做受免费视频一 | 久久亚洲日韩精品一区二区三区 | 久激情内射婷内射蜜桃人妖 | 久热国产vs视频在线观看 | 久久综合给合久久狠狠狠97色 | 国产人妻大战黑人第1集 | 扒开双腿疯狂进出爽爽爽视频 | 亚洲色大成网站www | 日韩 欧美 动漫 国产 制服 | 色妞www精品免费视频 | 欧美熟妇另类久久久久久不卡 | 四虎国产精品一区二区 | 日韩精品久久久肉伦网站 | 亚洲精品成人福利网站 | 日本精品人妻无码77777 天堂一区人妻无码 | 亚洲中文字幕在线无码一区二区 | 熟妇人妻无码xxx视频 | 亚洲乱亚洲乱妇50p | 国产精品久久久久影院嫩草 | 精品国精品国产自在久国产87 | 乱中年女人伦av三区 | 亚洲欧美日韩国产精品一区二区 | 亚洲の无码国产の无码步美 | 日本精品少妇一区二区三区 | 女人色极品影院 | 国精品人妻无码一区二区三区蜜柚 | 欧美阿v高清资源不卡在线播放 | 高清无码午夜福利视频 | 成人免费无码大片a毛片 | 2019nv天堂香蕉在线观看 | 高清无码午夜福利视频 | 精品国产青草久久久久福利 | 亚洲日韩乱码中文无码蜜桃臀网站 | 2020最新国产自产精品 | 亚洲日本一区二区三区在线 | 亚洲精品综合五月久久小说 | 亚洲国产欧美日韩精品一区二区三区 | 一本精品99久久精品77 | 亚洲成a人片在线观看无码3d | 久久久久久久人妻无码中文字幕爆 | 丰满妇女强制高潮18xxxx | 国产精品久久国产精品99 | 男女性色大片免费网站 | 无码午夜成人1000部免费视频 | 少妇邻居内射在线 | 日本高清一区免费中文视频 | 人人妻人人澡人人爽欧美精品 | 露脸叫床粗话东北少妇 | 精品日本一区二区三区在线观看 | 国产va免费精品观看 | 欧美丰满熟妇xxxx | 日本xxxx色视频在线观看免费 | 精品一二三区久久aaa片 | 欧美放荡的少妇 | 亚洲综合另类小说色区 | 亚洲成熟女人毛毛耸耸多 | 日韩亚洲欧美精品综合 | 国模大胆一区二区三区 | 国产 浪潮av性色四虎 | 久久久久99精品成人片 | 大乳丰满人妻中文字幕日本 | 中文无码精品a∨在线观看不卡 | 国产人妻人伦精品1国产丝袜 | 亚洲精品午夜无码电影网 | 久久精品人人做人人综合 | 亚洲第一无码av无码专区 | 久久99精品久久久久久动态图 | 高潮毛片无遮挡高清免费 | 波多野结衣av在线观看 | 国产精品丝袜黑色高跟鞋 | 天干天干啦夜天干天2017 | 国产片av国语在线观看 | 国产激情无码一区二区 | 国产熟妇另类久久久久 | 国产亚洲欧美日韩亚洲中文色 | 国产精品高潮呻吟av久久4虎 | 人妻aⅴ无码一区二区三区 | 久久久久人妻一区精品色欧美 | 欧美性生交xxxxx久久久 | 天海翼激烈高潮到腰振不止 | 国产精品国产自线拍免费软件 | 国产成人午夜福利在线播放 | 欧美性生交xxxxx久久久 | 日本www一道久久久免费榴莲 | 日本xxxx色视频在线观看免费 | 久久www免费人成人片 | 少妇厨房愉情理9仑片视频 | 久久五月精品中文字幕 | 亚洲国产精品久久久久久 | 亚洲精品久久久久中文第一幕 | 欧美日韩亚洲国产精品 | 人人妻在人人 | 国产精品内射视频免费 | 亚洲国产欧美在线成人 | 久久久国产精品无码免费专区 | 日韩 欧美 动漫 国产 制服 | 色欲综合久久中文字幕网 | 中文字幕无码日韩专区 | 亚洲色在线无码国产精品不卡 | 俺去俺来也在线www色官网 | 久久精品国产亚洲精品 | 国产精品igao视频网 | 无码精品国产va在线观看dvd | 亚洲中文字幕无码中文字在线 | 人人妻人人澡人人爽精品欧美 | 日日碰狠狠丁香久燥 | 欧美肥老太牲交大战 | 中文亚洲成a人片在线观看 | 日韩av无码一区二区三区 | 中文字幕日韩精品一区二区三区 | 国产精品亚洲一区二区三区喷水 | 国产乡下妇女做爰 | 亚洲国产精品毛片av不卡在线 | 免费观看又污又黄的网站 | 中文字幕无码日韩欧毛 | 亚洲综合在线一区二区三区 | 欧美一区二区三区 | 又大又硬又爽免费视频 | 在线播放免费人成毛片乱码 | 少妇性荡欲午夜性开放视频剧场 | 最近的中文字幕在线看视频 | 精品人人妻人人澡人人爽人人 | 国产凸凹视频一区二区 | 天天躁夜夜躁狠狠是什么心态 | 欧美精品免费观看二区 | 亚洲色欲久久久综合网东京热 | 一本精品99久久精品77 | 对白脏话肉麻粗话av | 夜夜夜高潮夜夜爽夜夜爰爰 | 亚洲成a人片在线观看无码3d | 久久视频在线观看精品 | 亚洲精品www久久久 | 国产香蕉97碰碰久久人人 | 亚洲欧美国产精品久久 | 女人色极品影院 | 性欧美熟妇videofreesex | 牲欲强的熟妇农村老妇女 | 十八禁视频网站在线观看 | 久久午夜无码鲁丝片 | 久久久av男人的天堂 | 日韩无码专区 | 18黄暴禁片在线观看 | 久久久久成人精品免费播放动漫 | 小泽玛莉亚一区二区视频在线 | 免费观看的无遮挡av | 女人被男人躁得好爽免费视频 | 狠狠cao日日穞夜夜穞av | 亚洲性无码av中文字幕 | 中文久久乱码一区二区 | 亚洲国产精品久久人人爱 | 亚洲男女内射在线播放 | 国产精品-区区久久久狼 | 精品无人区无码乱码毛片国产 | 秋霞成人午夜鲁丝一区二区三区 | 狠狠色噜噜狠狠狠狠7777米奇 | 亚洲乱亚洲乱妇50p | 四虎永久在线精品免费网址 | 精品少妇爆乳无码av无码专区 | 国产精品久久久久影院嫩草 | 一本久久a久久精品vr综合 | 久青草影院在线观看国产 | 亚洲精品欧美二区三区中文字幕 | 日韩人妻系列无码专区 | 亚洲日本va中文字幕 | 无码毛片视频一区二区本码 | 亚洲精品久久久久中文第一幕 | 无码人妻丰满熟妇区毛片18 | 国产熟妇另类久久久久 | 久久人人爽人人人人片 | 午夜精品久久久久久久久 | 国产午夜无码视频在线观看 | 性做久久久久久久久 | 精品国产一区二区三区四区 | 狠狠噜狠狠狠狠丁香五月 | 精品国产麻豆免费人成网站 | 女人被男人爽到呻吟的视频 | 国产真实夫妇视频 | 亚洲国产精品美女久久久久 | 18禁黄网站男男禁片免费观看 | 久久99精品久久久久婷婷 | 特黄特色大片免费播放器图片 | 亚洲国产日韩a在线播放 | 国内丰满熟女出轨videos | 国产在线aaa片一区二区99 | 免费人成在线视频无码 | 99麻豆久久久国产精品免费 | 精品夜夜澡人妻无码av蜜桃 | 欧美兽交xxxx×视频 | 成人欧美一区二区三区 | 中文字幕+乱码+中文字幕一区 | 99久久无码一区人妻 | 国产午夜亚洲精品不卡 | 日韩成人一区二区三区在线观看 | 国产熟妇另类久久久久 | 性做久久久久久久久 | 国产真实伦对白全集 | 日本va欧美va欧美va精品 | 精品无码国产一区二区三区av | 天天燥日日燥 | 狂野欧美性猛xxxx乱大交 | 国模大胆一区二区三区 | 亚洲男人av香蕉爽爽爽爽 | 欧美国产日产一区二区 | 成人精品天堂一区二区三区 | 麻豆国产人妻欲求不满谁演的 | a片免费视频在线观看 | 无码国产乱人伦偷精品视频 | 亚洲日韩av片在线观看 | 对白脏话肉麻粗话av | 亚洲毛片av日韩av无码 | 人妻少妇精品视频专区 | 国产一区二区三区精品视频 | 久久亚洲中文字幕精品一区 | 国产亚洲精品久久久久久 | 亚洲色在线无码国产精品不卡 | 亚洲日韩乱码中文无码蜜桃臀网站 | 欧美丰满老熟妇xxxxx性 | 人人妻人人藻人人爽欧美一区 | 国产乱码精品一品二品 | 丰满少妇熟乱xxxxx视频 | 免费无码午夜福利片69 | 中文字幕久久久久人妻 | 久久人人97超碰a片精品 | 久久久精品456亚洲影院 | 国产精品毛多多水多 | 欧美zoozzooz性欧美 | 久久综合色之久久综合 | а天堂中文在线官网 | 大地资源网第二页免费观看 | 国产成人无码午夜视频在线观看 | 人妻少妇被猛烈进入中文字幕 | 又湿又紧又大又爽a视频国产 | 初尝人妻少妇中文字幕 | 天堂а√在线中文在线 | 在线观看国产午夜福利片 | 亚洲熟女一区二区三区 | 国产精品久久久av久久久 | 国产xxx69麻豆国语对白 | 婷婷综合久久中文字幕蜜桃三电影 | 国产两女互慰高潮视频在线观看 | 男人的天堂2018无码 | 国产精品.xx视频.xxtv | 亚洲国产欧美国产综合一区 | 国产精品怡红院永久免费 | 色综合天天综合狠狠爱 | 亚洲人亚洲人成电影网站色 | 亚洲成a人片在线观看日本 | 性色av无码免费一区二区三区 | 亚洲熟悉妇女xxx妇女av | 天堂一区人妻无码 | 久久精品国产日本波多野结衣 | 人妻互换免费中文字幕 | 成人欧美一区二区三区 | 日韩精品无码一区二区中文字幕 | 日本在线高清不卡免费播放 | 久久久久久亚洲精品a片成人 | 天天躁夜夜躁狠狠是什么心态 | 亚洲a无码综合a国产av中文 | 强伦人妻一区二区三区视频18 | 97无码免费人妻超级碰碰夜夜 | 欧美日韩一区二区免费视频 | 国产无遮挡又黄又爽又色 | 人妻互换免费中文字幕 | 亚洲精品国产精品乱码不卡 | 少妇的肉体aa片免费 | 99精品视频在线观看免费 | 国产精品无码一区二区桃花视频 | 九月婷婷人人澡人人添人人爽 | 欧美黑人巨大xxxxx | 人人爽人人爽人人片av亚洲 | 亚洲乱码日产精品bd | 色综合天天综合狠狠爱 | 欧美性猛交xxxx富婆 | 红桃av一区二区三区在线无码av | 精品国产青草久久久久福利 | 99麻豆久久久国产精品免费 | 无码人妻丰满熟妇区毛片18 | 学生妹亚洲一区二区 | 精品国产一区av天美传媒 | 少妇高潮一区二区三区99 | 日韩精品成人一区二区三区 | 婷婷丁香五月天综合东京热 | 2019午夜福利不卡片在线 | 最近中文2019字幕第二页 | 荫蒂被男人添的好舒服爽免费视频 | 亚洲精品欧美二区三区中文字幕 | 久久午夜无码鲁丝片午夜精品 | 国产色视频一区二区三区 | 国产一区二区三区精品视频 | 亚洲熟熟妇xxxx | 国产激情无码一区二区app | 一本加勒比波多野结衣 | 亚洲国产av精品一区二区蜜芽 | 中文字幕乱码中文乱码51精品 | 国产精品免费大片 | 国产精品二区一区二区aⅴ污介绍 | 内射欧美老妇wbb | 天天拍夜夜添久久精品大 | 欧美性黑人极品hd | 国产激情艳情在线看视频 | 暴力强奷在线播放无码 | 免费无码的av片在线观看 | 97色伦图片97综合影院 | 大胆欧美熟妇xx | 在线a亚洲视频播放在线观看 | 久久国语露脸国产精品电影 | 老太婆性杂交欧美肥老太 | а√天堂www在线天堂小说 | 九九在线中文字幕无码 | 久久精品国产精品国产精品污 | 国产精品美女久久久久av爽李琼 | 少妇愉情理伦片bd | 丰腴饱满的极品熟妇 | 狠狠亚洲超碰狼人久久 | 在线观看国产午夜福利片 | 欧洲欧美人成视频在线 | 18禁止看的免费污网站 | 国内丰满熟女出轨videos | 亚洲成av人在线观看网址 | 日本一区二区三区免费高清 | 国产精品内射视频免费 | 人人澡人人妻人人爽人人蜜桃 | 久久人妻内射无码一区三区 | 日本熟妇乱子伦xxxx | 丝袜 中出 制服 人妻 美腿 | 中文字幕无码av激情不卡 | 日本又色又爽又黄的a片18禁 | 综合人妻久久一区二区精品 | 亚洲日本在线电影 | 亚洲人成无码网www | 97精品国产97久久久久久免费 | 日本精品人妻无码77777 天堂一区人妻无码 | 精品 日韩 国产 欧美 视频 | 日本精品人妻无码77777 天堂一区人妻无码 | 久久99精品久久久久久动态图 | 欧洲美熟女乱又伦 | 高清国产亚洲精品自在久久 | 97久久国产亚洲精品超碰热 | 97资源共享在线视频 | 亚洲男人av天堂午夜在 | 激情人妻另类人妻伦 | 精品无人国产偷自产在线 | 中文字幕 亚洲精品 第1页 | 撕开奶罩揉吮奶头视频 | 欧美亚洲日韩国产人成在线播放 | 精品久久综合1区2区3区激情 | 国产av一区二区精品久久凹凸 | 无码国产色欲xxxxx视频 | 免费网站看v片在线18禁无码 | 给我免费的视频在线观看 | 色欲久久久天天天综合网精品 | 色欲av亚洲一区无码少妇 | 国产极品美女高潮无套在线观看 | 丁香花在线影院观看在线播放 | 亚洲国产精品久久久久久 | 国产精品久久久久久亚洲毛片 | 波多野结衣一区二区三区av免费 | 亚洲精品久久久久中文第一幕 | 中文无码成人免费视频在线观看 | 人人妻人人澡人人爽精品欧美 | 奇米影视7777久久精品 | 俄罗斯老熟妇色xxxx | 中文无码成人免费视频在线观看 | 久久成人a毛片免费观看网站 | 5858s亚洲色大成网站www | 国产区女主播在线观看 | 伊人久久大香线焦av综合影院 | 精品国产国产综合精品 | 久久综合网欧美色妞网 | 亚洲欧美日韩国产精品一区二区 | 初尝人妻少妇中文字幕 | 99久久精品午夜一区二区 | 欧美人妻一区二区三区 | 亚洲 激情 小说 另类 欧美 | 精品人人妻人人澡人人爽人人 | 久久精品女人的天堂av | 丰满少妇熟乱xxxxx视频 | 国产三级精品三级男人的天堂 | 精品久久综合1区2区3区激情 | 亚洲中文无码av永久不收费 | 婷婷色婷婷开心五月四房播播 | www成人国产高清内射 | 噜噜噜亚洲色成人网站 | 波多野结衣一区二区三区av免费 | 成人精品天堂一区二区三区 | 日日躁夜夜躁狠狠躁 | 最新国产乱人伦偷精品免费网站 | 亚洲精品久久久久久久久久久 | 亚洲va欧美va天堂v国产综合 | 免费无码肉片在线观看 | 大乳丰满人妻中文字幕日本 | 中文无码精品a∨在线观看不卡 | 无套内谢的新婚少妇国语播放 | 最新国产乱人伦偷精品免费网站 | 97精品国产97久久久久久免费 | 久久久国产一区二区三区 | 欧美日本精品一区二区三区 | 日本乱偷人妻中文字幕 | 欧美性色19p | 理论片87福利理论电影 | 中文字幕 人妻熟女 | 乱人伦人妻中文字幕无码 | 永久黄网站色视频免费直播 | 亚洲熟熟妇xxxx | 大肉大捧一进一出视频出来呀 | 亚洲国产精品毛片av不卡在线 | 乱码av麻豆丝袜熟女系列 | 国产精品久免费的黄网站 | 无码午夜成人1000部免费视频 | 久久99精品国产.久久久久 | 久久久久国色av免费观看性色 | 日日碰狠狠躁久久躁蜜桃 | 亚洲日本一区二区三区在线 | 亚洲综合色区中文字幕 | 日本www一道久久久免费榴莲 | 成人精品一区二区三区中文字幕 | 欧美丰满老熟妇xxxxx性 | 国产午夜亚洲精品不卡下载 | av无码电影一区二区三区 | 精品国产一区av天美传媒 | 天天摸天天透天天添 | 亚洲色欲色欲欲www在线 | 国产精品久久久久久亚洲影视内衣 | 国产av久久久久精东av | 俄罗斯老熟妇色xxxx | 亚洲成a人片在线观看无码3d | 国产在线一区二区三区四区五区 | 伊在人天堂亚洲香蕉精品区 | 国产精品久久福利网站 | 国产猛烈高潮尖叫视频免费 | 激情五月综合色婷婷一区二区 | 国产成人一区二区三区别 | 亚洲乱码国产乱码精品精 | 成人女人看片免费视频放人 | 成人欧美一区二区三区 | 欧美熟妇另类久久久久久不卡 | 国产精品亚洲专区无码不卡 | 76少妇精品导航 | 四虎影视成人永久免费观看视频 | 夜夜夜高潮夜夜爽夜夜爰爰 | 欧美老妇交乱视频在线观看 | 亚洲 a v无 码免 费 成 人 a v | 亚洲自偷自偷在线制服 | 2019nv天堂香蕉在线观看 | 牲欲强的熟妇农村老妇女 | 乌克兰少妇xxxx做受 | 日日鲁鲁鲁夜夜爽爽狠狠 | 久久久精品成人免费观看 | 曰本女人与公拘交酡免费视频 | av香港经典三级级 在线 | 无码中文字幕色专区 | 久久久久av无码免费网 | 丁香啪啪综合成人亚洲 | 久久久久久av无码免费看大片 | 少妇太爽了在线观看 | 午夜肉伦伦影院 | 亚洲日本一区二区三区在线 | 日韩少妇白浆无码系列 | 日韩少妇内射免费播放 | 国产精品人人爽人人做我的可爱 | 自拍偷自拍亚洲精品10p | 亚洲日本va中文字幕 | 国产精华av午夜在线观看 | 婷婷综合久久中文字幕蜜桃三电影 | 国内精品一区二区三区不卡 | 久久人人爽人人人人片 | 丰满诱人的人妻3 | 亚洲中文无码av永久不收费 | 精品欧美一区二区三区久久久 | 在线成人www免费观看视频 | 成人毛片一区二区 | 久久精品国产大片免费观看 | 国语精品一区二区三区 | 国产亚洲精品久久久久久久 | 欧美大屁股xxxxhd黑色 | 亚洲中文字幕无码中字 | 成人无码精品1区2区3区免费看 | 国产亚av手机在线观看 | 无遮挡啪啪摇乳动态图 | 国精品人妻无码一区二区三区蜜柚 | 国产亚洲精品久久久久久久 | 人人妻人人澡人人爽欧美一区 | 国产亚洲tv在线观看 | 亚洲精品一区三区三区在线观看 | 国产人妻精品一区二区三区 | 成年美女黄网站色大免费全看 | 精品国产aⅴ无码一区二区 | 久久久国产精品无码免费专区 | 又黄又爽又色的视频 | 欧美 日韩 亚洲 在线 | 精品久久久久香蕉网 | 天天拍夜夜添久久精品 | 2020久久香蕉国产线看观看 | 成人无码精品1区2区3区免费看 | 久久99久久99精品中文字幕 | 无遮挡国产高潮视频免费观看 | 亚洲中文字幕无码一久久区 | 色欲人妻aaaaaaa无码 | 久久综合给合久久狠狠狠97色 | 亚洲日韩av一区二区三区四区 | 亚洲精品无码人妻无码 | 爱做久久久久久 | 亚洲精品美女久久久久久久 | 免费观看激色视频网站 | 国产精品亚洲综合色区韩国 | 久久精品一区二区三区四区 | 国产亚洲精品久久久久久久久动漫 | 无遮无挡爽爽免费视频 | 午夜福利一区二区三区在线观看 | 欧美阿v高清资源不卡在线播放 | 麻豆国产丝袜白领秘书在线观看 | 福利一区二区三区视频在线观看 | 东京热男人av天堂 | 国产成人一区二区三区别 | 成人性做爰aaa片免费看 | 成人无码精品1区2区3区免费看 | 岛国片人妻三上悠亚 | 免费无码一区二区三区蜜桃大 | 精品无码成人片一区二区98 | 亚洲 另类 在线 欧美 制服 | 国产一区二区三区精品视频 | 亚洲中文字幕在线无码一区二区 | 中文字幕无码视频专区 | 久青草影院在线观看国产 | 国产乱码精品一品二品 | 国产无遮挡吃胸膜奶免费看 | 鲁大师影院在线观看 | 亚洲成av人综合在线观看 | 1000部啪啪未满十八勿入下载 | 人人澡人摸人人添 | 自拍偷自拍亚洲精品被多人伦好爽 | aⅴ在线视频男人的天堂 | 少妇无码av无码专区在线观看 | 成人aaa片一区国产精品 | 精品一区二区三区无码免费视频 | 国产精品国产三级国产专播 | 99久久精品日本一区二区免费 | 国产精品毛片一区二区 | 蜜臀aⅴ国产精品久久久国产老师 | 黑人大群体交免费视频 | 无套内谢老熟女 | 欧美激情一区二区三区成人 | 欧美日韩综合一区二区三区 | 又黄又爽又色的视频 | 免费无码av一区二区 | 人妻少妇精品久久 | 国内少妇偷人精品视频 | 亚洲无人区午夜福利码高清完整版 | 99精品国产综合久久久久五月天 | 国产成人无码av在线影院 | 国产午夜视频在线观看 | 无码精品人妻一区二区三区av | 两性色午夜视频免费播放 | 亚洲男人av香蕉爽爽爽爽 | 色综合久久久久综合一本到桃花网 | 日本精品人妻无码免费大全 | 乱码午夜-极国产极内射 | 亚洲一区二区三区无码久久 | 精品成在人线av无码免费看 | 国产乱人偷精品人妻a片 | 5858s亚洲色大成网站www | 成人aaa片一区国产精品 | 国产精品-区区久久久狼 | 国产舌乚八伦偷品w中 | 日本一卡2卡3卡四卡精品网站 | 97资源共享在线视频 | 水蜜桃色314在线观看 | 国产精品丝袜黑色高跟鞋 | 狠狠色噜噜狠狠狠狠7777米奇 | 久久久久久九九精品久 | 中文毛片无遮挡高清免费 | 日本精品人妻无码免费大全 | 特级做a爰片毛片免费69 | 77777熟女视频在线观看 а天堂中文在线官网 | 国产亚洲精品久久久久久久久动漫 | 精品久久综合1区2区3区激情 | 又粗又大又硬毛片免费看 | 97无码免费人妻超级碰碰夜夜 | 色诱久久久久综合网ywww | 中文毛片无遮挡高清免费 | 久久午夜无码鲁丝片 | 97久久国产亚洲精品超碰热 | 亚洲呦女专区 | 国产精品久久久午夜夜伦鲁鲁 | 久久99久久99精品中文字幕 | 夜夜躁日日躁狠狠久久av | 日韩精品一区二区av在线 | 露脸叫床粗话东北少妇 | 午夜精品一区二区三区在线观看 | 午夜熟女插插xx免费视频 | 中文无码精品a∨在线观看不卡 | 亚洲国产精品无码久久久久高潮 | 香港三级日本三级妇三级 | 国产成人综合在线女婷五月99播放 | 久久综合九色综合欧美狠狠 | 夜夜高潮次次欢爽av女 | 国产成人精品优优av | www国产亚洲精品久久网站 | 东北女人啪啪对白 | 亚洲 a v无 码免 费 成 人 a v | 久久aⅴ免费观看 | 日韩精品无码免费一区二区三区 | 久久人人爽人人爽人人片av高清 | 国产亚洲美女精品久久久2020 | 日本一区二区三区免费播放 | 学生妹亚洲一区二区 | 国产偷国产偷精品高清尤物 | 欧美乱妇无乱码大黄a片 | 风流少妇按摩来高潮 | 亚洲娇小与黑人巨大交 | 51国偷自产一区二区三区 | 国产精品美女久久久网av | 国产亚洲美女精品久久久2020 | 亚洲高清偷拍一区二区三区 | 国产精品对白交换视频 | 国产成人综合美国十次 | 日本欧美一区二区三区乱码 | 精品国产一区二区三区av 性色 | аⅴ资源天堂资源库在线 | 亚洲精品一区三区三区在线观看 | 国产精品自产拍在线观看 | 欧洲美熟女乱又伦 | 无码人妻av免费一区二区三区 | 免费观看黄网站 | 成人动漫在线观看 | 无码av免费一区二区三区试看 | 欧美老人巨大xxxx做受 | 日日麻批免费40分钟无码 | 三级4级全黄60分钟 | 欧美国产亚洲日韩在线二区 | 十八禁真人啪啪免费网站 | 51国偷自产一区二区三区 | 色五月丁香五月综合五月 | 99久久亚洲精品无码毛片 | 欧美日韩色另类综合 | 激情人妻另类人妻伦 | 亚洲国产精品久久人人爱 | 久精品国产欧美亚洲色aⅴ大片 | 在线亚洲高清揄拍自拍一品区 | 丰满少妇人妻久久久久久 | 兔费看少妇性l交大片免费 | 日本精品人妻无码77777 天堂一区人妻无码 | 精品 日韩 国产 欧美 视频 | 性欧美牲交xxxxx视频 | 国产成人一区二区三区在线观看 | 无码福利日韩神码福利片 | 亚洲天堂2017无码中文 | 午夜精品久久久久久久久 | 成人三级无码视频在线观看 | 亚洲色偷偷偷综合网 | 2019nv天堂香蕉在线观看 | 夜精品a片一区二区三区无码白浆 | 天天摸天天碰天天添 | 久久97精品久久久久久久不卡 | 亚洲热妇无码av在线播放 | 精品人妻av区 | 国产午夜视频在线观看 | 久久久中文久久久无码 | 少妇性l交大片 | 男女作爱免费网站 | 国产免费无码一区二区视频 | 国产午夜无码精品免费看 | 在线视频网站www色 | 精品久久8x国产免费观看 | 国产极品美女高潮无套在线观看 | 久久久久se色偷偷亚洲精品av | 国产精品无码一区二区三区不卡 | 高清不卡一区二区三区 | 久久综合九色综合欧美狠狠 | 日韩精品乱码av一区二区 | 人人妻人人澡人人爽欧美一区 | 国产在热线精品视频 | 久久久久久av无码免费看大片 | 国产亚洲精品精品国产亚洲综合 | 国产亚洲人成在线播放 | 欧美日韩一区二区免费视频 | 无码福利日韩神码福利片 | 久久久久久九九精品久 | 51国偷自产一区二区三区 | 国产又粗又硬又大爽黄老大爷视 | 2020久久香蕉国产线看观看 | 亚洲第一无码av无码专区 | 清纯唯美经典一区二区 | 性欧美熟妇videofreesex | 亚洲人成人无码网www国产 | 亚洲综合无码久久精品综合 | 国产精品99久久精品爆乳 | 内射巨臀欧美在线视频 | 国产成人无码av片在线观看不卡 | √天堂中文官网8在线 | 久久久久久av无码免费看大片 | 久久99精品国产麻豆 | 国内精品九九久久久精品 | 亚洲欧美色中文字幕在线 | 女高中生第一次破苞av | 国产激情艳情在线看视频 | 色五月丁香五月综合五月 | 久久亚洲a片com人成 | 国内揄拍国内精品少妇国语 | 国产电影无码午夜在线播放 | 大地资源网第二页免费观看 | 美女扒开屁股让男人桶 | 亚洲七七久久桃花影院 | 精品国产一区二区三区av 性色 | 无码人妻出轨黑人中文字幕 | 成人一在线视频日韩国产 | 久久这里只有精品视频9 | 国产色在线 | 国产 | 美女极度色诱视频国产 | 中文字幕精品av一区二区五区 | 无码毛片视频一区二区本码 | 亚洲自偷精品视频自拍 | 人人妻人人澡人人爽精品欧美 | 国产网红无码精品视频 | 亚洲va欧美va天堂v国产综合 | 久久97精品久久久久久久不卡 | 偷窥日本少妇撒尿chinese | 高潮喷水的毛片 | 荫蒂被男人添的好舒服爽免费视频 | 天天摸天天碰天天添 | 久久久成人毛片无码 | 色综合久久久无码中文字幕 | 无码国产色欲xxxxx视频 | 三上悠亚人妻中文字幕在线 | 乱人伦人妻中文字幕无码久久网 | 3d动漫精品啪啪一区二区中 | 少妇邻居内射在线 | 欧美人与禽猛交狂配 | 欧美肥老太牲交大战 | 成人无码视频免费播放 | 国产莉萝无码av在线播放 | 狠狠躁日日躁夜夜躁2020 | 狂野欧美性猛xxxx乱大交 | 美女毛片一区二区三区四区 | 午夜熟女插插xx免费视频 | 日韩欧美中文字幕公布 | 国产九九九九九九九a片 | 最近中文2019字幕第二页 | 国产亚洲精品久久久闺蜜 | 丰满妇女强制高潮18xxxx | 国产午夜精品一区二区三区嫩草 | 狂野欧美激情性xxxx | 粉嫩少妇内射浓精videos | 亚洲小说图区综合在线 | 人人妻人人澡人人爽人人精品 | 国产另类ts人妖一区二区 | 午夜无码人妻av大片色欲 | 久久精品视频在线看15 | 波多野结衣乳巨码无在线观看 | 美女扒开屁股让男人桶 | 男人的天堂av网站 | 日韩人妻无码中文字幕视频 | 国产成人精品久久亚洲高清不卡 | 久久国产精品二国产精品 | 亚拍精品一区二区三区探花 | 亚洲中文字幕av在天堂 | 亚洲午夜无码久久 | 国产成人无码a区在线观看视频app | 亚洲一区二区三区含羞草 | 日日夜夜撸啊撸 | 国产xxx69麻豆国语对白 | 天堂а√在线中文在线 | 日本免费一区二区三区最新 | 欧美老熟妇乱xxxxx | av在线亚洲欧洲日产一区二区 | 精品一区二区不卡无码av | 精品一区二区三区无码免费视频 | 天天躁夜夜躁狠狠是什么心态 | 亚洲日韩av片在线观看 | 久久久久亚洲精品中文字幕 | 性色欲情网站iwww九文堂 | 午夜丰满少妇性开放视频 | 国产精品va在线观看无码 | 日韩无码专区 | 精品国产一区二区三区四区 | 成在人线av无码免观看麻豆 | 一本久久a久久精品vr综合 | 国产精品久久久久无码av色戒 | 久久精品人妻少妇一区二区三区 | 亚洲中文字幕无码一久久区 | 亚洲中文字幕无码中字 | 蜜桃臀无码内射一区二区三区 | 亚无码乱人伦一区二区 | 狂野欧美激情性xxxx | 影音先锋中文字幕无码 | 狠狠躁日日躁夜夜躁2020 | 国产亚洲精品久久久久久国模美 | 免费人成网站视频在线观看 | 国内精品人妻无码久久久影院蜜桃 | 国产人妻精品一区二区三区不卡 | 精品国产av色一区二区深夜久久 | 99久久精品无码一区二区毛片 | 午夜福利不卡在线视频 | 国产免费久久久久久无码 | 爆乳一区二区三区无码 | 99国产精品白浆在线观看免费 | 2019午夜福利不卡片在线 | 美女张开腿让人桶 | 国产熟妇另类久久久久 | 中文字幕乱码人妻无码久久 | 亚洲综合无码一区二区三区 | 日韩av无码中文无码电影 | 精品无码国产自产拍在线观看蜜 | 1000部啪啪未满十八勿入下载 | 成人亚洲精品久久久久软件 | 久久精品成人欧美大片 | 一本大道伊人av久久综合 | 亚洲熟妇色xxxxx亚洲 | 99精品视频在线观看免费 | 国产又粗又硬又大爽黄老大爷视 | 国产九九九九九九九a片 | 国产三级精品三级男人的天堂 | 国产婷婷色一区二区三区在线 | 人妻少妇精品久久 | 色综合视频一区二区三区 | 国产精品无码一区二区桃花视频 | 成人免费无码大片a毛片 | 88国产精品欧美一区二区三区 | 亚洲日韩av片在线观看 | 无码毛片视频一区二区本码 | 亚洲国产精品一区二区第一页 | 亚洲欧美综合区丁香五月小说 | 一本色道久久综合亚洲精品不卡 | 日日摸夜夜摸狠狠摸婷婷 | 成人亚洲精品久久久久软件 | 亚洲国产精品一区二区第一页 | 精品午夜福利在线观看 | 无码人妻黑人中文字幕 | 水蜜桃色314在线观看 | 国产精品自产拍在线观看 | a在线亚洲男人的天堂 | 日本一卡2卡3卡4卡无卡免费网站 国产一区二区三区影院 | 精品无码国产一区二区三区av | 又大又硬又爽免费视频 | 丰腴饱满的极品熟妇 | 亚洲s码欧洲m码国产av | 亚洲午夜福利在线观看 | 成人aaa片一区国产精品 | 国产精品毛片一区二区 | 无码精品人妻一区二区三区av | 国内精品久久久久久中文字幕 | 无码人妻精品一区二区三区下载 | 亚洲精品中文字幕久久久久 | 捆绑白丝粉色jk震动捧喷白浆 | 人妻体内射精一区二区三四 | 99麻豆久久久国产精品免费 | 18禁止看的免费污网站 | 伊人久久大香线焦av综合影院 | 中文字幕+乱码+中文字幕一区 | 图片小说视频一区二区 | 中文字幕av日韩精品一区二区 | 欧美日韩一区二区综合 | 黑森林福利视频导航 | 国产香蕉尹人综合在线观看 | 午夜免费福利小电影 | 亚洲欧洲无卡二区视頻 | 国产一区二区三区影院 | 狠狠cao日日穞夜夜穞av | 丰满诱人的人妻3 | 天天躁日日躁狠狠躁免费麻豆 | 狠狠色欧美亚洲狠狠色www | 九一九色国产 | 中文久久乱码一区二区 | 一区二区三区高清视频一 | 午夜精品久久久内射近拍高清 | 亚洲精品www久久久 | 亚洲人成影院在线观看 | 欧洲熟妇色 欧美 | 久久综合激激的五月天 | 亚洲精品国产品国语在线观看 | 欧美亚洲日韩国产人成在线播放 | 国产婷婷色一区二区三区在线 | 久精品国产欧美亚洲色aⅴ大片 | 久久婷婷五月综合色国产香蕉 | 国内精品一区二区三区不卡 | 久久亚洲国产成人精品性色 | 久久人人97超碰a片精品 | 亚洲中文字幕av在天堂 | 色噜噜亚洲男人的天堂 | 国内少妇偷人精品视频免费 | √天堂中文官网8在线 | 天天拍夜夜添久久精品大 | 欧美色就是色 | 激情爆乳一区二区三区 | 无码乱肉视频免费大全合集 | 两性色午夜视频免费播放 | 97色伦图片97综合影院 | 99在线 | 亚洲 | 2019nv天堂香蕉在线观看 | 精品亚洲成av人在线观看 | 精品欧美一区二区三区久久久 | 精品无码国产自产拍在线观看蜜 | 亚洲精品久久久久中文第一幕 | 欧美三级a做爰在线观看 | 日韩精品无码一本二本三本色 | 国产精品无码一区二区三区不卡 | 亚洲啪av永久无码精品放毛片 | 国产va免费精品观看 | 蜜桃臀无码内射一区二区三区 | www国产精品内射老师 | 麻豆md0077饥渴少妇 | 一本精品99久久精品77 | 久久99热只有频精品8 | 久久人人爽人人爽人人片av高清 | 久久久婷婷五月亚洲97号色 | 亚洲精品国偷拍自产在线麻豆 | 国产9 9在线 | 中文 | 久久亚洲国产成人精品性色 | 国产精品成人av在线观看 | 亚洲第一无码av无码专区 | 国产无遮挡又黄又爽免费视频 | 纯爱无遮挡h肉动漫在线播放 | 久久97精品久久久久久久不卡 | 自拍偷自拍亚洲精品10p | 99精品国产综合久久久久五月天 | 暴力强奷在线播放无码 | 男女下面进入的视频免费午夜 | 欧美人与牲动交xxxx | 中文字幕日产无线码一区 | 亚洲精品一区二区三区大桥未久 | 国语精品一区二区三区 | 麻豆果冻传媒2021精品传媒一区下载 | 亚洲男人av香蕉爽爽爽爽 | 99er热精品视频 | 无码人妻少妇伦在线电影 | 亚洲色欲色欲天天天www | 久久人人爽人人爽人人片av高清 | 欧美日韩在线亚洲综合国产人 | 欧美人与牲动交xxxx | 中文无码成人免费视频在线观看 | 免费看少妇作爱视频 | 无码帝国www无码专区色综合 | 亚洲午夜福利在线观看 | 无码精品国产va在线观看dvd | 午夜福利电影 | 久久99精品国产.久久久久 | 性欧美videos高清精品 | 波多野结衣高清一区二区三区 | 亚洲 日韩 欧美 成人 在线观看 | 国产激情综合五月久久 | 少妇太爽了在线观看 | 日韩视频 中文字幕 视频一区 | 国产精品美女久久久久av爽李琼 | 久久久精品国产sm最大网站 | 性开放的女人aaa片 | 国产高清av在线播放 | 亚洲色无码一区二区三区 | 国产精品亚洲综合色区韩国 | 99久久无码一区人妻 | 少妇无套内谢久久久久 | 成 人影片 免费观看 | 久久熟妇人妻午夜寂寞影院 | 国产做国产爱免费视频 | 日本丰满护士爆乳xxxx | 国产激情无码一区二区app | 日本又色又爽又黄的a片18禁 | 99久久精品日本一区二区免费 | 久久国产精品偷任你爽任你 | 欧美日韩一区二区综合 | 在教室伦流澡到高潮hnp视频 | 国产精品二区一区二区aⅴ污介绍 | 亲嘴扒胸摸屁股激烈网站 | 精品偷拍一区二区三区在线看 | 国产亚av手机在线观看 | 亚洲成a人片在线观看日本 | 国产免费久久精品国产传媒 | 国产精品丝袜黑色高跟鞋 | 亚洲无人区午夜福利码高清完整版 | 国产精品毛片一区二区 | 国产小呦泬泬99精品 | 国产av一区二区三区最新精品 | 亚洲色在线无码国产精品不卡 | 国产成人综合色在线观看网站 | 国产午夜手机精彩视频 | 国产精品无码永久免费888 | 99精品久久毛片a片 | 久久视频在线观看精品 | 大肉大捧一进一出视频出来呀 | 捆绑白丝粉色jk震动捧喷白浆 | 国产日产欧产精品精品app | 久久久久国色av免费观看性色 | 在线观看欧美一区二区三区 | 中文字幕精品av一区二区五区 | 亚洲国产欧美在线成人 | 日本精品少妇一区二区三区 | 熟女少妇在线视频播放 | 日韩欧美中文字幕公布 | 大色综合色综合网站 | 欧美黑人性暴力猛交喷水 | 国精品人妻无码一区二区三区蜜柚 | 国内精品人妻无码久久久影院蜜桃 | 无码精品国产va在线观看dvd | 内射巨臀欧美在线视频 | 国产办公室秘书无码精品99 | 岛国片人妻三上悠亚 | 老子影院午夜伦不卡 | 久久人人爽人人爽人人片ⅴ | 国产亚洲精品久久久久久大师 | 在线播放免费人成毛片乱码 | 国产精品-区区久久久狼 | 高清国产亚洲精品自在久久 | 亚洲精品一区二区三区婷婷月 | 在线 国产 欧美 亚洲 天堂 | 国产精品亚洲一区二区三区喷水 | 国产在热线精品视频 | 少妇无码一区二区二三区 | 国产精品怡红院永久免费 | 久久天天躁狠狠躁夜夜免费观看 | 久久综合九色综合97网 | 国产亚洲精品久久久久久久久动漫 | 亚洲日韩中文字幕在线播放 | 欧美精品无码一区二区三区 | 人妻体内射精一区二区三四 | 任你躁国产自任一区二区三区 | 欧美性猛交内射兽交老熟妇 | 亚洲色偷偷偷综合网 | 国产三级久久久精品麻豆三级 | 国产热a欧美热a在线视频 | 久久久久亚洲精品中文字幕 | 丁香花在线影院观看在线播放 | 乌克兰少妇性做爰 | 精品久久久无码人妻字幂 | 少女韩国电视剧在线观看完整 | 窝窝午夜理论片影院 | 蜜桃av蜜臀av色欲av麻 999久久久国产精品消防器材 | 国产成人无码av片在线观看不卡 | 色情久久久av熟女人妻网站 | 久久国产精品精品国产色婷婷 | 精品久久久久久人妻无码中文字幕 | 樱花草在线社区www | 青春草在线视频免费观看 | 亚欧洲精品在线视频免费观看 | 国产成人无码a区在线观看视频app | 亚洲国产精品无码一区二区三区 | 久久精品人妻少妇一区二区三区 | 久久精品国产一区二区三区 | 国内少妇偷人精品视频 | av无码不卡在线观看免费 | 日本成熟视频免费视频 | 久久久久久久人妻无码中文字幕爆 | 少妇邻居内射在线 | 久久综合狠狠综合久久综合88 | 国产成人精品视频ⅴa片软件竹菊 | 性色欲情网站iwww九文堂 | 久久伊人色av天堂九九小黄鸭 | 天天综合网天天综合色 | 国产极品视觉盛宴 | 青青草原综合久久大伊人精品 | 色 综合 欧美 亚洲 国产 | 精品国产一区av天美传媒 | 国产精品久久久久久亚洲影视内衣 | 成人免费视频一区二区 | 又大又硬又爽免费视频 | 免费观看激色视频网站 | 欧美人与牲动交xxxx | 久久亚洲中文字幕精品一区 | 自拍偷自拍亚洲精品被多人伦好爽 | 扒开双腿吃奶呻吟做受视频 | 日本大乳高潮视频在线观看 | 色综合视频一区二区三区 | 曰本女人与公拘交酡免费视频 | 在线а√天堂中文官网 | 精品乱子伦一区二区三区 | 久久精品99久久香蕉国产色戒 | 久久无码中文字幕免费影院蜜桃 | 欧美熟妇另类久久久久久不卡 | 无码国产激情在线观看 | 欧美一区二区三区视频在线观看 | yw尤物av无码国产在线观看 | 99精品国产综合久久久久五月天 | 狠狠色噜噜狠狠狠狠7777米奇 | 极品尤物被啪到呻吟喷水 | 国产亚洲视频中文字幕97精品 | 亚洲国产欧美日韩精品一区二区三区 | 大乳丰满人妻中文字幕日本 | 精品国产一区二区三区四区 | 国产亚洲美女精品久久久2020 | 丰满妇女强制高潮18xxxx | 国产黑色丝袜在线播放 | 精品国产一区二区三区四区 | 成年女人永久免费看片 | а√天堂www在线天堂小说 | 51国偷自产一区二区三区 | 精品国产一区二区三区四区在线看 | 无人区乱码一区二区三区 | 日本熟妇乱子伦xxxx | 精品无码成人片一区二区98 | 无码人妻精品一区二区三区不卡 | 国产农村乱对白刺激视频 | 国产高清av在线播放 | 岛国片人妻三上悠亚 | 国产情侣作爱视频免费观看 | 国产特级毛片aaaaaaa高清 | 欧美三级a做爰在线观看 | 免费无码的av片在线观看 | 国产成人无码a区在线观看视频app | 国产成人综合色在线观看网站 | 亚洲无人区一区二区三区 | 日韩精品乱码av一区二区 | 中文字幕无码乱人伦 | 兔费看少妇性l交大片免费 | 亚洲国产综合无码一区 | 久久这里只有精品视频9 | 夜精品a片一区二区三区无码白浆 | 鲁一鲁av2019在线 | 人妻少妇被猛烈进入中文字幕 | 丝袜足控一区二区三区 | 最近的中文字幕在线看视频 | 亚洲国产精华液网站w | 亚洲成色在线综合网站 | 国产人妻人伦精品 | 狠狠综合久久久久综合网 | 国产成人一区二区三区别 | 永久免费观看国产裸体美女 | 免费观看激色视频网站 | 婷婷色婷婷开心五月四房播播 | 麻豆国产丝袜白领秘书在线观看 | 给我免费的视频在线观看 | 日本熟妇乱子伦xxxx | 亚洲精品午夜国产va久久成人 | 少妇激情av一区二区 | 97夜夜澡人人双人人人喊 | 鲁一鲁av2019在线 | 欧美老人巨大xxxx做受 | 国产美女精品一区二区三区 | 国产尤物精品视频 | 人人妻在人人 | 国产黑色丝袜在线播放 | 欧美一区二区三区视频在线观看 | 国产凸凹视频一区二区 | 亚洲精品久久久久久一区二区 | 久久久久se色偷偷亚洲精品av | 久久婷婷五月综合色国产香蕉 | 黑人粗大猛烈进出高潮视频 | 粉嫩少妇内射浓精videos | 国产色视频一区二区三区 | 奇米综合四色77777久久 东京无码熟妇人妻av在线网址 | 乱人伦人妻中文字幕无码久久网 | 亚洲欧洲中文日韩av乱码 | 风流少妇按摩来高潮 | 俺去俺来也www色官网 | 亚洲精品国产精品乱码不卡 | 欧美亚洲国产一区二区三区 | 国产av剧情md精品麻豆 | 秋霞成人午夜鲁丝一区二区三区 | 色噜噜亚洲男人的天堂 | 久久久成人毛片无码 | 色窝窝无码一区二区三区色欲 | 国产成人无码区免费内射一片色欲 | 国产精品多人p群无码 | 国产午夜福利100集发布 | 免费国产成人高清在线观看网站 | 亚洲日韩一区二区 | 精品国产乱码久久久久乱码 | 久久亚洲中文字幕无码 | 久青草影院在线观看国产 | 日韩精品无码一本二本三本色 | 精品国产麻豆免费人成网站 | 久精品国产欧美亚洲色aⅴ大片 | 国产内射老熟女aaaa | 色一情一乱一伦一区二区三欧美 | 丝袜足控一区二区三区 | 岛国片人妻三上悠亚 | 成人性做爰aaa片免费看不忠 | 欧美成人午夜精品久久久 | www成人国产高清内射 | 粗大的内捧猛烈进出视频 | 色欲久久久天天天综合网精品 | 亚洲国产精品美女久久久久 | 荡女精品导航 | 亚洲一区二区三区四区 | 美女黄网站人色视频免费国产 | 欧美人与禽猛交狂配 | 人人妻人人澡人人爽欧美一区 | 狂野欧美激情性xxxx | 男人的天堂2018无码 | 色欲av亚洲一区无码少妇 | 国产成人久久精品流白浆 | 欧美性生交xxxxx久久久 | 久久久亚洲欧洲日产国码αv | av无码电影一区二区三区 | 蜜桃视频插满18在线观看 | 国产福利视频一区二区 | 无码中文字幕色专区 | 人妻天天爽夜夜爽一区二区 | 国产成人一区二区三区在线观看 | 久久综合给合久久狠狠狠97色 | 国产精品久久久久影院嫩草 | 亚洲精品一区二区三区大桥未久 | 婷婷综合久久中文字幕蜜桃三电影 | 欧美日韩一区二区三区自拍 | 日韩av无码一区二区三区 | 日日碰狠狠躁久久躁蜜桃 | 国产精品久久精品三级 | 无码人妻av免费一区二区三区 | 中文字幕乱妇无码av在线 | 377p欧洲日本亚洲大胆 | 婷婷五月综合激情中文字幕 | 久久99精品国产麻豆 | 国产后入清纯学生妹 | 久久zyz资源站无码中文动漫 | 久久精品国产99精品亚洲 | 亚洲乱码日产精品bd | 少妇一晚三次一区二区三区 | 久久国产精品萌白酱免费 | 国产在线一区二区三区四区五区 | 国产97色在线 | 免 | 少妇人妻大乳在线视频 | 欧美三级不卡在线观看 | 女人被男人爽到呻吟的视频 | 欧美老妇交乱视频在线观看 | 国产熟妇高潮叫床视频播放 | 亚洲色无码一区二区三区 | 国产又粗又硬又大爽黄老大爷视 | 国产69精品久久久久app下载 | 亚洲综合无码久久精品综合 | 亚洲中文字幕va福利 | 无码av岛国片在线播放 | 人人爽人人澡人人人妻 | 日韩精品无码一本二本三本色 | 7777奇米四色成人眼影 | 欧美性生交xxxxx久久久 | 亚洲欧美日韩成人高清在线一区 | 国产69精品久久久久app下载 | 国产精品第一国产精品 | 亚洲熟熟妇xxxx | 麻花豆传媒剧国产免费mv在线 | 最近免费中文字幕中文高清百度 | 大乳丰满人妻中文字幕日本 | 精品国产一区二区三区av 性色 | 国产av一区二区精品久久凹凸 | 99久久99久久免费精品蜜桃 | 激情综合激情五月俺也去 | 精品无码av一区二区三区 | 免费观看黄网站 | 天堂无码人妻精品一区二区三区 | 欧美黑人乱大交 | 亚洲综合精品香蕉久久网 | 又黄又爽又色的视频 | 精品国产一区二区三区av 性色 | 成人性做爰aaa片免费看不忠 | 小sao货水好多真紧h无码视频 | 久久这里只有精品视频9 | 国产香蕉97碰碰久久人人 | 天天做天天爱天天爽综合网 | 欧美一区二区三区 | 久久国语露脸国产精品电影 | 欧美性色19p | 精品少妇爆乳无码av无码专区 | 全球成人中文在线 | yw尤物av无码国产在线观看 | 97精品人妻一区二区三区香蕉 | 蜜臀aⅴ国产精品久久久国产老师 | 欧美国产日产一区二区 | 美女毛片一区二区三区四区 | 999久久久国产精品消防器材 | 日韩精品乱码av一区二区 | 国产免费无码一区二区视频 | 免费观看的无遮挡av | av在线亚洲欧洲日产一区二区 | 日欧一片内射va在线影院 | 超碰97人人做人人爱少妇 | 国产激情一区二区三区 | 色综合久久中文娱乐网 | 在线а√天堂中文官网 | 久久久精品人妻久久影视 | 亚洲中文字幕成人无码 | 成人一区二区免费视频 | 欧美丰满熟妇xxxx | 亚拍精品一区二区三区探花 | 精品人妻中文字幕有码在线 | 人妻少妇精品无码专区动漫 | 99久久久国产精品无码免费 | 成人亚洲精品久久久久 | 国产女主播喷水视频在线观看 | 亚洲欧美精品aaaaaa片 | 一本久道高清无码视频 | 澳门永久av免费网站 | 亚洲国产高清在线观看视频 | 又黄又爽又色的视频 | 久久www免费人成人片 | 无遮挡啪啪摇乳动态图 | 亚洲综合无码一区二区三区 | 国产农村妇女高潮大叫 | 欧美 日韩 亚洲 在线 | 午夜性刺激在线视频免费 | 亚洲国产高清在线观看视频 | 男人扒开女人内裤强吻桶进去 | a片免费视频在线观看 | 亚洲一区二区三区偷拍女厕 | 国内少妇偷人精品视频免费 | 欧美日韩在线亚洲综合国产人 | 精品国产aⅴ无码一区二区 | 综合网日日天干夜夜久久 | 图片小说视频一区二区 | 亚洲熟熟妇xxxx | 99精品久久毛片a片 | 国产三级精品三级男人的天堂 | 日日摸天天摸爽爽狠狠97 | 日本高清一区免费中文视频 | 在线看片无码永久免费视频 | 国产三级精品三级男人的天堂 | 亚洲gv猛男gv无码男同 | 少妇性荡欲午夜性开放视频剧场 | 欧美人与动性行为视频 | 亚洲国产欧美日韩精品一区二区三区 | 亚洲va中文字幕无码久久不卡 | 中国女人内谢69xxxx | 亚洲精品欧美二区三区中文字幕 | 亚洲欧美日韩综合久久久 | 我要看www免费看插插视频 | 亚洲国产精品无码一区二区三区 | 精品久久久无码人妻字幂 | 成人免费视频视频在线观看 免费 | 亚洲乱亚洲乱妇50p | 色妞www精品免费视频 | 久久午夜无码鲁丝片秋霞 | 丁香花在线影院观看在线播放 | 欧美freesex黑人又粗又大 | 在线欧美精品一区二区三区 | 国产成人午夜福利在线播放 | 亚洲 a v无 码免 费 成 人 a v | 日韩少妇白浆无码系列 | 妺妺窝人体色www在线小说 | 国产精品无码永久免费888 | 国产亚洲精品精品国产亚洲综合 | 亚洲欧洲中文日韩av乱码 | 九一九色国产 | 高潮毛片无遮挡高清免费视频 | 欧美高清在线精品一区 | 精品乱码久久久久久久 | 丁香啪啪综合成人亚洲 | 东京热男人av天堂 | 无码国模国产在线观看 | 色欲av亚洲一区无码少妇 | 色综合久久久无码网中文 | 久久国产自偷自偷免费一区调 | 中文亚洲成a人片在线观看 | 性欧美牲交xxxxx视频 | 好屌草这里只有精品 | 自拍偷自拍亚洲精品10p | 嫩b人妻精品一区二区三区 | 性色欲情网站iwww九文堂 | 午夜理论片yy44880影院 | 国产精品a成v人在线播放 | 熟女体下毛毛黑森林 | 亚洲色成人中文字幕网站 | 3d动漫精品啪啪一区二区中 | 熟女少妇在线视频播放 | 激情内射亚州一区二区三区爱妻 | 一二三四社区在线中文视频 | 精品国产麻豆免费人成网站 | 无码一区二区三区在线观看 | 精品国产一区av天美传媒 | 精品无码国产自产拍在线观看蜜 | 精品人妻中文字幕有码在线 | 55夜色66夜色国产精品视频 | 亚洲国产精品久久久天堂 | 欧美日韩在线亚洲综合国产人 | 高清国产亚洲精品自在久久 | 久久久精品456亚洲影院 | 亚洲成a人片在线观看无码3d | 婷婷丁香五月天综合东京热 | 久久国产自偷自偷免费一区调 | 中文字幕乱码亚洲无线三区 | 亚洲一区二区三区播放 | 国产高清不卡无码视频 | 一本久久伊人热热精品中文字幕 | 一本色道久久综合狠狠躁 | 红桃av一区二区三区在线无码av | 99久久婷婷国产综合精品青草免费 | 国产成人一区二区三区别 | 中文无码成人免费视频在线观看 | 色婷婷av一区二区三区之红樱桃 | 国内精品人妻无码久久久影院 | 亚洲欧洲日本无在线码 | 亚洲色大成网站www | 无码国模国产在线观看 | 亚洲精品国产精品乱码不卡 | 成在人线av无码免观看麻豆 | 亚洲中文字幕成人无码 | 成人无码视频在线观看网站 | 欧美 日韩 人妻 高清 中文 | 精品夜夜澡人妻无码av蜜桃 | 日韩人妻少妇一区二区三区 | 国产成人无码区免费内射一片色欲 | 国产亚洲tv在线观看 | 亚洲中文字幕成人无码 | 久久久亚洲欧洲日产国码αv | 亚洲成av人片天堂网无码】 | 少妇无套内谢久久久久 | 内射老妇bbwx0c0ck | 亚洲 欧美 激情 小说 另类 | 成人无码视频在线观看网站 | 我要看www免费看插插视频 | 三级4级全黄60分钟 | 天堂在线观看www | 中文字幕av伊人av无码av | 亚洲精品国产精品乱码不卡 | 免费无码肉片在线观看 | 色妞www精品免费视频 | 亚洲一区二区三区偷拍女厕 | 夜精品a片一区二区三区无码白浆 | 嫩b人妻精品一区二区三区 | 无码精品人妻一区二区三区av | 人人妻人人澡人人爽精品欧美 | 大色综合色综合网站 | 亚洲中文字幕乱码av波多ji | 日本精品少妇一区二区三区 | 日本一本二本三区免费 | 丰满人妻翻云覆雨呻吟视频 | 国产精品理论片在线观看 | 老头边吃奶边弄进去呻吟 | 色狠狠av一区二区三区 | 无人区乱码一区二区三区 | 亚洲男人av天堂午夜在 | 亚洲成av人影院在线观看 | 香港三级日本三级妇三级 | 国语自产偷拍精品视频偷 | 国产农村乱对白刺激视频 | 麻豆国产人妻欲求不满谁演的 | 天堂一区人妻无码 | 国产精品99久久精品爆乳 | 久久国产精品二国产精品 | 国产在热线精品视频 | 日韩精品乱码av一区二区 | 国产av一区二区三区最新精品 | 蜜桃臀无码内射一区二区三区 | 成人精品一区二区三区中文字幕 | 久久久久免费精品国产 | 精品国产成人一区二区三区 | 少妇性俱乐部纵欲狂欢电影 | 国产精品无码永久免费888 | 人妻少妇精品无码专区二区 | 露脸叫床粗话东北少妇 | 人妻无码αv中文字幕久久琪琪布 | 一本精品99久久精品77 | 亚洲人成人无码网www国产 | 麻豆成人精品国产免费 | 久久99精品国产麻豆 | 亚洲中文字幕在线观看 | 2020最新国产自产精品 | 精品国产青草久久久久福利 | 熟女少妇人妻中文字幕 | 国产极品美女高潮无套在线观看 | 波多野结衣av在线观看 | 性欧美牲交在线视频 | 欧美日韩综合一区二区三区 | 久久午夜无码鲁丝片午夜精品 | 三上悠亚人妻中文字幕在线 | 国产午夜精品一区二区三区嫩草 | 麻豆果冻传媒2021精品传媒一区下载 | 亚洲一区二区三区偷拍女厕 | 欧美兽交xxxx×视频 | 青草青草久热国产精品 | 精品国产一区二区三区av 性色 | 国产 浪潮av性色四虎 | 国产精品怡红院永久免费 | 日本一卡2卡3卡4卡无卡免费网站 国产一区二区三区影院 | 精品aⅴ一区二区三区 | 亚洲一区二区三区 | 波多野结衣 黑人 | 欧美日韩亚洲国产精品 | 97资源共享在线视频 | 中文字幕无码日韩欧毛 | 又大又硬又爽免费视频 | 麻豆成人精品国产免费 | 300部国产真实乱 | 亚洲精品中文字幕 | 黑人粗大猛烈进出高潮视频 | 国产精品无码一区二区桃花视频 | 亚洲精品一区国产 | 麻豆果冻传媒2021精品传媒一区下载 | 无套内谢的新婚少妇国语播放 | 无码人妻丰满熟妇区五十路百度 | 国产成人午夜福利在线播放 | а√资源新版在线天堂 | 国产一区二区不卡老阿姨 | 又大又黄又粗又爽的免费视频 | 亚洲精品美女久久久久久久 | 丝袜美腿亚洲一区二区 | 亚洲熟妇色xxxxx亚洲 | 国产亚洲精品久久久久久国模美 | 久久久久久久人妻无码中文字幕爆 | 欧美自拍另类欧美综合图片区 | 18禁止看的免费污网站 | 人人妻人人澡人人爽人人精品浪潮 | 亚洲欧洲中文日韩av乱码 | 老头边吃奶边弄进去呻吟 | 女人高潮内射99精品 | 麻豆成人精品国产免费 | 人人妻人人藻人人爽欧美一区 | 国产无av码在线观看 | 少妇无套内谢久久久久 | 国产精品无码mv在线观看 | 男人和女人高潮免费网站 | 亚洲中文字幕无码一久久区 | 亚洲一区二区三区在线观看网站 | 亚洲综合在线一区二区三区 | 日韩av无码一区二区三区不卡 | 精品亚洲韩国一区二区三区 | 2020最新国产自产精品 | 曰韩无码二三区中文字幕 | 亚洲大尺度无码无码专区 | 国产精品99久久精品爆乳 | 国产精品久久国产三级国 | 国产麻豆精品一区二区三区v视界 | 亚洲精品一区二区三区婷婷月 | 亚洲码国产精品高潮在线 | 国内老熟妇对白xxxxhd | 久久久婷婷五月亚洲97号色 | 亚洲国产精品无码久久久久高潮 | 午夜无码区在线观看 | 国产精品无码一区二区桃花视频 | 欧美成人高清在线播放 | 人人妻人人澡人人爽欧美一区 | 男人的天堂2018无码 | 中国女人内谢69xxxx | v一区无码内射国产 | 欧美日韩综合一区二区三区 | 成人一在线视频日韩国产 | 亚洲精品久久久久久一区二区 | 久久久久av无码免费网 | 蜜臀aⅴ国产精品久久久国产老师 | 欧美丰满少妇xxxx性 | 免费看少妇作爱视频 | 成人aaa片一区国产精品 | 亚洲国产精品一区二区美利坚 | 精品久久8x国产免费观看 | 伊人久久婷婷五月综合97色 | 2019nv天堂香蕉在线观看 | 美女黄网站人色视频免费国产 | 国产精品久久久久久亚洲毛片 | 久久久久99精品国产片 | 老熟女重囗味hdxx69 |