久久精品国产精品国产精品污,男人扒开添女人下部免费视频,一级国产69式性姿势免费视频,夜鲁夜鲁很鲁在线视频 视频,欧美丰满少妇一区二区三区,国产偷国产偷亚洲高清人乐享,中文 在线 日韩 亚洲 欧美,熟妇人妻无乱码中文字幕真矢织江,一区二区三区人妻制服国产

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

【核心API开发】Spark入门教程[3]

發布時間:2025/3/21 编程问答 39 豆豆
生活随笔 收集整理的這篇文章主要介紹了 【核心API开发】Spark入门教程[3] 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

本教程源于2016年3月出版書籍《Spark原理、機制及應用》?,在此以知識共享為初衷公開部分內容,如有興趣,請支持正版書籍。

? ? ? ???

? ? ? ? Spark綜合了前人分布式數據處理架構和語言的優缺點,使用簡潔、一致的函數式語言Scala作為主要開發語言,同時為了方便更多語言背景的人使用,還支持Java、Python和R語言。Spark因為其彈性分布式數據集(RDD)的抽象數據結構設計,通過實現抽象類RDD可以產生面對不同應用場景的子類。本章將先介紹Spark編程模型、RDD的相關概念、常用API源碼及應用案例,然后具體介紹四大應用框架,為后續進一步學習Spark框架打下基礎。

?

3.1 Spark 編程模型概述

?

Spark的編程模型如圖3-1所示。

?

圖3-1?Spark編程模型

? ? ? ?開發人員在編寫Spark應用的時候,需要提供一個包含main函數的驅動程序作為程序的入口,開發人員根據自己的需求,在main函數中調用Spark提供的數據操縱接口,利用集群對數據執行并行操作。

?????? Spark為開發人員提供了兩類抽象接口。第一類抽象接口是彈性分布式數據集(Resilient Distributed Dataset,下文簡稱RDD),顧名思義,RDD是對數據集的抽象封裝,開發人員可以通過RDD提供的開發接口來訪問和操縱數據集合,而無需了解數據的存儲介質(內存或磁盤)、文件系統(本地文件系統、HDFS或Tachyon)、存儲節點(本地或遠程節點)等諸多實現細節;第二類抽象是共享變量(Shared Variables),通常情況下,一個應用程序在運行的時候會被劃分成分布在不同執行器之上的多個任務,從而提高運算的速度,每個任務都會有一份獨立的程序變量拷貝,彼此之間互不干擾,然而在某些情況下需要任務之間相互共享變量,Apache Spark提供了兩類共享變量,它們分別是:廣播變量(Broadcast Variable)和累加器(Accumulators)。第3.3節將介紹RDD的基本概念和RDD提供的編程接口,并在后面詳細解讀接口的源碼實現,從而加深對RDD的理解,此外會在第3.4節中介紹兩類共享變量的使用方法。

?

3.2 Spark Context

? ?????SparkContext是整個項目程序的入口,無論從本地讀取文件(textfile方法)還是從HDFS讀取文件或者通過集合并行化獲得RDD,都先要創建SparkContext對象,然后使用SparkContext對RDD進行創建和后續的轉換操作。本節主要介紹SparkContext類的作用和創建過程,然后通過一個簡單的例子向讀者介紹SparkContext的應用方法,從應用角度來理解其作用。

?

3.2.1 ?SparkContext的作用

?

SparkContext除了是Spark的主要入口,它也可以看作是對用戶的接口,它代表與Spark集群的連接對象,由圖3-2可以看到,SparkContext主要存在于Driver Program中。可以使用SparkContext來創建集群中的RDD、累積量和廣播量,在后臺SparkContext還能發送任務給集群管理器。每一個JVM只能有運行一個程序,即對應只有一個SparkContext處于激活狀態,因此在創建新的SparkContext前需要把舊的SparkContext停止。

?

圖3-2? SparkContext在Spark架構圖中的位置

3.2.2 SparkContext創建

?

? ? ? ?SparkContext的創建過程首先要加載配置文件,然后創建SparkEnv、TaskScheduler和DAGScheduler,具體過程和源碼分析如下。

1.加載配置文件SparkConf

? ? ? ?SparkConf在初始化時,需先選擇相關的配置參數,包含master、appName、sparkHome、jars、environment等信息,然后通過構造方法傳遞給SparkContext,這里的構造函數有多種表達形式,當SparkContex獲取了全部相關的本地配置信息后開始下一步操作。 def this(master: String, appName: String, conf: SparkConf) = this(SparkContext.updatedConf(conf, master, appName)) def this( master: String, appName: String, sparkHome: String = null, jars: Seq[String] = Nil, environment: Map[String, String] = Map(), preferredNodeLocationData: Map[String, Set[SplitInfo]] = Map()) = { this(SparkContext.updatedConf(newSparkConf(),master,appName,sparkHome,jars,environment)) this.preferredNodeLocationData = preferredNodeLocationData } 2.創建SparkEnv 創建SparkConf后就需要創建SparkEnv,這里面包括了很多Spark執行時的重要組件,包括 MapOutputTracker、ShuffleFetcher、BlockManager等,在這里源碼是通過SparkEnv類的伴生對象SparkEnv Object內的createDriverEnv方法實現的。 private[spark] defcreateDriverEnv( conf: SparkConf, isLocal: Boolean, listenerBus: LiveListenerBus, mockOutputCommitCoordinator:Option[OutputCommitCoordinator] = None): SparkEnv = { assert(conf.contains("spark.driver.host"),"spark.driver.host is not set on the driver!") assert(conf.contains("spark.driver.port"),"spark.driver.port is not set on the driver!") val hostname =conf.get("spark.driver.host") val port =conf.get("spark.driver.port").toInt create( conf, SparkContext.DRIVER_IDENTIFIER, hostname, port, isDriver = true, isLocal = isLocal, listenerBus = listenerBus, mockOutputCommitCoordinator =mockOutputCommitCoordinator ) } <p style="background:#F3F3F3;"> </p>

3.創建TaskScheduler

? ? ? ? 創建SparkEnv后,就需要創建SparkContext中調度執行方面的變量TaskScheduler。

private[spark] var (schedulerBackend, taskScheduler) = SparkContext.createTaskScheduler(this, master) private val heartbeatReceiver = env.actorSystem.actorOf( Props(new HeartbeatReceiver(taskScheduler)), "HeartbeatReceiver") @volatile private[spark] var dagScheduler: DAGScheduler = _ try { dagScheduler = new DAGScheduler(this) } catch { case e: Exception => { try { stop() } finally { throw new SparkException("Error while constructing DAGScheduler", e) } } } // start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's // constructor taskScheduler.start()

TaskScheduler是依據Spark的執行模式進行初始化的,詳細代碼在SparkContext中的createTaskScheduler方法中。在這里以Standalone模式為例,它會將sc傳遞給TaskSchedulerImpl,然后創建SparkDeploySchedulerBackend并初始化,最后返回Scheduler對象。

case SPARK_REGEX(sparkUrl) => val scheduler = new TaskSchedulerImpl(sc) val masterUrls = sparkUrl.split(",").map("spark://" + _) val backend = new SparkDeploySchedulerBackend(scheduler, sc, masterUrls) scheduler.initialize(backend) (backend, scheduler)

4.創建DAGScheduler

創建TaskScheduler對象后,再將TaskScheduler對象傳至DAGScheduler,用來創建DAGScheduler對象。

@volatile private[spark] var dagScheduler: DAGScheduler = _ try { dagScheduler = new DAGScheduler(this) } catch { case e: Exception => { try { stop() } finally { throw new SparkException("Error while constructing DAGScheduler", e) } } } def this(sc: SparkContext) = this(sc, sc.taskScheduler)

創建DAGScheduler后再調用其start()方法將其啟動。以上4點是整個SparkContext的創建過程,這其中包含了很多重要的步驟,從這個過程能理解Spark的初始啟動情況。?

?

3.2.3 使用shell

除了單獨編寫一個應用程序的方式之外,Spark還提供了一個交互式Shell來使用。在Shell中,用戶的每條語句都能在輸入完畢后及時得到結果,而無需手動編譯和運行程序。Shell的使用十分簡單,改變當前工作路徑到Spark的安裝目錄,執行指令$ ./bin/spark-shell即可進入Shell。
在Shell中,系統根據命令提供的參數自動配置和生成了一個SparkContext對象sc,直接使用即可,無需再手動實例化SparkContext。除了結果會實時顯示之外,其余操作與編寫單獨應用程序類似。讀者可直接參考Spark官方提供的Spark ProgrammingGuide等文檔,在此不做具體介紹。


3.2.4 應用實踐
?這里向讀者介紹一段用于統計文件中字母a和字母b出現頻率的Spark應用,通過這個程序向讀者展示SparkContext的用法。

【例3-1】簡單的Spark程序

/* SimpleApp.scala */ import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.SparkConf object SimpleApp { def main(args: Array[String]) { val logFile = "YOUR_SPARK_HOME/README.md" // 本地文件目錄 val conf = new SparkConf().setAppName("Simple Application") //給Application命名 val sc = new SparkContext(conf) //創建SparkContext val logData = sc.textFile(logFile, 2).cache() //緩存文件 val numAs = logData.filter(line => line.contains("a")).count() //計算字母a的個數 val numBs = logData.filter(line => line.contains("b")).count() //計算字母b的個數 println("Lines with a: %s, Lines with b: %s".format(numAs, numBs)) //打印結果 }

這個例子中,首先創建本地文件目錄logFile和配置文件conf,然后使用配置信息conf實例化SparkContext得到sc,得到sc之后就可以從本地文件中讀取數據并把數據轉化成RDD,并命名為logData,然后logData調用filter方法分別計算包含字母a的行數和包含字母b的行數,最后打印出結果。該例子中使用了SparkContext的實例化對象創建RDD數據集。

?

3.3 RDD簡介

? ? ? ?本節主要介紹彈性分布式數據集RDD的相關概念,其中包括RDD創建來源、兩種重要的Transformation和Action操作、數據持久化和檢查點機制,通過對Spark中RDD核心抽象的深入理解,能幫助讀者全面理解后面的RDD的分區、并行計算和依賴等機制和變換過程。

3.3.1 RDD創建

?? ? ? ?RDD是Spark應用程序開發過程中最為基本也最為重要的一類數據結構,RDD被定義為只讀、分區化的記錄集合,更為通俗來講,RDD是對原始數據的進一步封裝,封裝導致兩個結果:第一個結果是數據訪問權限被限制,數據只能被讀,而無法被修改;第二個結果是數據操作功能被強化,使得數據能夠實現分布式存儲、并發處理、自動容錯等等諸多功能。Spark的整個計算過程都是圍繞數據集RDD來進行,下面將會對RDD的創建以及數據結構進行簡單介紹。

1.RDD的兩類來源

??? 1)將未被封裝的原始數據進行封裝操作得到,根據原始數據的存在形式,又可被進一步分成由集合并行化獲得或從外部數據集中獲得。

? ? 2)由其他RDD通過轉換操作獲得,由于RDD的只讀特性,內部的數據無法被修改,因此RDD內部提供了一系列數據轉換(Transformation)操作接口,這類接口可返回新的RDD,而不影響原來的RDD內容。在后面第3章3.3節中將會對RDD的創建方法進行更加詳盡的說明。

2.RDD內部數據結構

1)分區信息的列表

2)對父RDD的依賴信息

3)對Key-Value鍵值對數據類型的分區器(可選)

4)計算分區的函數

5)每個數據分區的物理地址列表(可選)

? ? ? ? RDD的數據操作并非在調用內部接口的一刻便開始計算,而是遇到要求將數據返回給驅動程序,或者寫入到文件的接口時,才會進行真正的計算,這類會觸發計算的操作稱為動作(Action)操作,而這種延時計算的特性,被稱為RDD計算的惰性(Lazy),在第六章機篇將分別講述動作操作和惰性特征。

??? 在第1章中說過,Spark是一套內存計算框架,其能夠將頻繁使用的中間數據存儲在內存當中,數據被使用的頻率越高,性能提升越明顯。數據的內存化操作在RDD層次上,體現為RDD的持久化操作,在3.3.4節描述RDD的持久化操作。除此之外,RDD還提供了類似于持久化操作的檢查點機制,表面看上去與存儲在HDFS的持久化操作類似,實際使用上又有諸多不同,在3.3.5小節描述RDD的檢查點機制。

?

3.3.2 RDD轉換操作

?? ? ? ? 轉換(Transformation)操作是由一個RDD轉換到另一個新的RDD,例如,map操作在RDD中是一個轉換操作,map轉換會讓RDD中的每一個數據都通過一個指定函數得到一個新的RDD。

? ? ? ? RDD內部可以封裝任意類型的數據,但某些操作只能應用在封裝鍵值對類型數據的RDD之上,例如轉換操作reduceByKey、groupByKey和countByKey等。

?????? 表3-1展示了RDD所提供的所有轉換操作及其含義。

表3-1:RDD提供的轉換操作

Transformation

算子作用

map(func)

新RDD中的數據由原RDD中的每個數據通過函數func得到

filter(func)

新RDD種的數據由原RDD中每個能使函數func返回true值的數據組成

flatMap(func)

類似于map轉換,但func的返回值是一個Seq對象,Seq中的元素個數可以是0或者多個

mapPartitions(func)

類似于map轉換,但func的輸入不是一個數據項,則是一個分區,若RDD內數據類型為T,則func必須是Iterator<T> => Iterator<U>類型

mapPartitionsWithIndex(func)

類似于mapPartitions轉換,但func的數據還多了一個分區索引,即func類型是(Int, Iterator<T> => Iterator<U>)

sample(withReplacement, fraction, seed)

對fraction中的數據進行采樣,可以選擇是否要進行替換,需要提供一個隨機數種子

union(otherDataset)

新RDD中數據是原RDD與RDD otherDataset中數據的并集

Intersection(otherDataset)

新RDD中數據是原RDD與RDD otherDataset中數據的交集

distinct([numTasks])

新RDD中數據是原RDD中數據去重的結果

groupByKey([numTasks])

原RDD中數據類型為(K, V)對,新RDD中數據類型為(K, Iterator(V))對,即將相同K的所有V放到一個迭代器中

reduceByKey(func, [numTasks])

原RDD和新RDD數據的類型都為(K, V)對,讓原RDD相同K的所有V依次經過函數func,得到的最終值作為K的V

aggregateByKey(zeroValue)(seqOp, combOp, [numTasks])

原RDD數據的類型為(K, V),新RDD數據的類型為(K, U),類似于groupbyKey函數,但聚合函數由用戶指定。鍵值對的值的類型可以與原RDD不同

sortByKey([ascending], [numTasks])

原RDD和新RDD數據的類型為(K, V)鍵值對,新RDD的數據根據ascending的指定順序或者逆序排序

join(otherDataset, [numTasks])

原RDD數據的類型為(K, V),otherDataset數據的類型為(K, W),對于相同的K,返回所有的(K, (V, W))

cogroup(otherDataset, [numTasks])

原RDD數據的類型為(K, V),otherDataset數據的類型為(K, W),對于相同的K,返回所有的(K, Iterator<V>, Iterator<W>)

catesian(otherDataset)

原RDD數據的類型為為T,otherDataset數據的類型為U,返回所有的(T, U)

pipe(command, [envValue])

令原RDD中的每個數據以管道的方式依次通過命令command,返回得到的標準輸出

coalesce(numPartitions)

減少原RDD中分區的數目至指定值numPartitions

repartition(numPartitions)

修改原RDD中分區的數目至指定值numPartitions



3.3.3 RDD動作操作

?? ? ? ? 相對于轉換,動作(Action)操作用于向驅動(Driver)程序返回值或者將值寫入到文件當中。例如reduce動作會使用同一個指定函數讓RDD中的所有數據做一次聚合,把運算的結果返回。表3-2展示了RDD所提供的所有動作操作及其含義。

表3-2:RDD提供的動作操作

Action

算子作用

reduce(func)

令原RDD中的每個值依次經過函數func,func的類型為(T, T) => T,返回最終結果

collect()

將原RDD中的數據打包成數組并返回

count()

返回原RDD中數據的個數

first()

返回原RDD中的第一個數據項

take(n)

返回原RDD中前n個數據項,返回結果為數組

takeSample(withReplacement, num, [seed])

對原RDD中的數據進行采樣,返回num個數據項

saveAsTextFile(path)

將原RDD中的數據寫入到文本文件當中

saveAsSequenceFile(path)(Java and Scala)

將原RDD中的數據寫入到序列文件當中

savaAsObjectFile(path)(Java and Scala)

將原RDD中的數據序列化并寫入到文件當中。可以通過SparkContext.objectFile()方法加載

countByKey()

原RDD數據的類型為(K, V),返回hashMap(K, Int),用于統計K出現的次數

foreach(func)

對于原RDD中的每個數據執行函數func,返回數組

?

3.3.4 惰性計算

?? ? ? ? 需要注意的是,一個RDD執行轉換操作之后,數據的計算是延遲的,新生成的RDD會記錄轉換的相關信息,包括父RDD的編號、用戶指定函數等等,但并不會立即執行計算操作,真正的計算操作過程得等到遇到一個動作操作(Action)才會執行,此外,除非用戶指定持久化操作,否則轉換過程中產生的中間數據在計算完畢后會被丟棄,即數據是非持久化。即使對同一個RDD執行相同的轉換操作,數據同樣會被重新計算。

?????? Spark采取惰性計算機制有其道理所在。例如可以實現通過map方法創建的一個新數據集,然后使用reduce方法,最終只返回 reduce 的結果給driver,而不是整個大的新數據集。

?

3.3.5 RDD持久化

? ? ?? ? 惰性計算的缺陷也是明顯的:中間數據默認不會保存,每次動作操作都會對數據重復計算,某些計算量比較大的操作可能會影響到系統的運算效率,因此Spark允許將轉換過程中手動將某些會被頻繁使用的RDD執行持久化操作,持久化后的數據可以被存儲在內存、磁盤或者Tachyon當中,這將使得后續的動作(Actions)變得更加迅速(通常快10倍)。

? ? ? ? 通過調用RDD提供的cache或persist函數即可實現數據的持久化,persist函數需要指定存儲級別(StorageLevel),cache等價于采用默認存儲級別的persist函數,Spark提供的存儲級別及其含義如表3-3所示。在6.4節會繼續討論RDD持久化過程在源碼級別上的實現細節。

表3-3? RDD的存儲級別

存儲級別

含義

MEMORY_ONLY

把RDD以非序列化狀態存儲在內存中,如果內存空間不夠,則有些分區數據會在需要的時候進行計算得到

MEMORY_AND_DISK

把RDD以非序列化存儲在內存中,如果內存空間不夠,則存儲在硬盤中

?

MEMORY_ONLY_SER

把RDD以Java對象序列化儲存在內存中,序列化后占用空間更小,尤其當使用快速序列化庫(如Kyro[1])時效果更好。缺點是讀數據要反序列化,會消耗CPU計算資源

MEMORY_AND_DISK_SER

類似MEMORY_ONLY_SER,區別是當內存不夠的時候會把RDD持久化到磁盤中,而不是在需要它們的時候實時計算

DISK_ONLY

只把RDD存儲到磁盤中

MEMORY_ONLY_2,

類似MEMORY_ONLY,不同的是會復制一個副本到另一個集群節點

MEMORY_AND_DISK_2, etc.

類似MEMORY_AND_DISK,不同的是會復制一個副本到另一個集群節點

?

OFF_HEAP

把RDD以序列化形式存儲在Tachyon中,與MEMORY_ONLY_SER不同的是,使用OFF-HEAP模式會減少垃圾回收的開銷,此外還能讓執行器共享內存,這種模式更適應于多并發和對內存要求高的環境

?

3.3.6 RDD檢查點

? ? ? ? 因為DAG中血統(lineage)如果太長,當重計算的時候開銷會很大,故使用檢查點機制,將計算過程持久化到磁盤,這樣如果出現計算故障的時候就可以在檢查點開始重計算,而不需要從頭開始。RDD的檢查點(Checkpoint)機制類似持久化機制中的persist(StorageLevel.DISK_ONLY),數據會被存儲在磁盤當中,兩者最大的區別在于:持久化機制所存儲的數據,在驅動程序運行結束之后會被自動清除;檢查點機制則會將數據永久存儲在磁盤當中,如果不手動刪除,數據會一直存在。換句話說,檢查點機制存儲的數據能夠被下一次運行的應用程序所使用。

? ? ? ? 檢查點的使用與持久化類似,調用RDD的checkpoint方法即可。在6.4小節中繼續介紹檢查點機制的實現以及其與持久化過程的區別。

?

3.4 共享變量

? ? ? ? 因為在tasks之間讀寫共享變量會很低效,spark提供兩種類型的共享變量類型,即broadcast variables和accumulators。

?

?

3.4.1 廣播變量 廣播變量(Broadcast variables)允許用戶將一個只讀變量緩存到每一臺機器之上,而不像傳統變量一樣,拷貝到每一個任務當中,同一臺機器上的不同任務可以共享該變量值。如例3-1代碼所示,對于變量v,只需要調用SparkContext.broadcast(v)即可得到變量v的廣播變量broadcastVar,通過調用broadcastVar的value方法即可取得變量值。

【例3-1】廣播變量的用法

scala> val broadcastVar = sc.broadcast(Array(1, 2, 3)) broadcastVar:spark.Broadcast[Array[Int]] = spark.Broadcast(b5c40191-a864-4c7d-b9bf-d87e1a4e787c) scala> broadcastVar.value res0: Array[Int] = Array(1, 2, 3) 3.4.2 累加器

? ? ? ? 累加器(Accumulators)是另外一種共享變量。累加器變量只能執行加法操作,但其支持并行操作,這意味著不同任務多次對累加器執行加法操作后,加法器最后的值等于所有累加的和。累加器的值只能被驅動程序訪問,集群中的任務無法訪問該值。

【例3-2】累加器的用法  

scala> val accum = sc.accumulator(0, "My Accumulator") scala> accum.value() //(通過這種方法進行讀取原始變量值) accum: spark.Accumulator[Int] = 0 scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x) res2:Int = 10 3.5 Spark核心開發實踐

? ? ? ? 本節主要介紹核心開發中RDD的兩個主要操作算子Transformation和Action的使用方法,由于Spark是基于延遲計算,Transforamation算子并不立即執行,這時只是保存計算狀態,當Action算子出現才真正執行計算。為此下面就這兩個算子分別學習主要的API方法和應用實例,如果想了解更多關于RDD的API操作,建議讀者參考拉籌伯大學教授的個人主頁http://homepage.cs.latrobe.edu.au/zhe/。

3.5.1 單值型Tranformation算子

單值型的算子就是輸入為單個值形式,這里主要介紹map、flatMap、mapPartitions、union、cartesian、groupBy、filter、distinct、subtract、foreach、cache、persist、sample以及takeSample方法,如表3-4列出各方法的簡要概述。

表3-4? 單值型Transformation算子

方法名

方法定義

map

def map[U](f: (T) ? U)(implicit arg0: ClassTag[U]): RDD[U]

flatMap

defmapPartitions[U](f: (Iterator[T])??Iterator[U], preservesPartitioning: Boolean = false)

mapPartition

def mapPartitions[U](f: (Iterator[T])??Iterator[U], preservesPartitioning: Boolean = false)(implicit arg0: ClassTag[U]): RDD[U]

mapPartitionsWith

Index

def mapPartitionsWithIndex[U](f: (Int, Iterator[T])??Iterator[U], preservesPartitioning: Boolean = false)(implicit arg0: ClassTag[U]): RDD[U]

foreach

def foreach(f: (T)???Unit): Unit

foreachPartition

def foreachPartition(f: (Iterator[T])??Unit): Unit

glom

def glom(): RDD[Array[T]]

union

def union(other: RDD[T]): RDD[T]

cartesian

def cartesian[U](other: RDD[U])(implicit arg0: ClassTag[U]): RDD[(T, U)]

groupBy

def groupBy[K](f: (T)???K, p: Partitioner)(implicit kt: ClassTag[K], ord: Ordering[K] = null): RDD[(K, Iterable[T])]

filter

def filter(f: (T)???Boolean): RDD[T]

distinct

def distinct(): RDD[T]

subtract

def subtract(other: RDD[T], p: Partitioner)(implicit ord: Ordering[T] = null): RDD[T]

cache

def cache(): RDD.this.type

persist

def persist(): RDD.this.type

sample

def sample(withReplacement: Boolean, fraction: Double, seed: Long = Utils.random.nextLong): RDD[T]

takeSample

def takeSample(withReplacement: Boolean, num: Int, seed: Long = Utils.random.nextLong): Array[T]

1.map

? ? ? ? 對原來每一個輸入的RDD數據集進行函數轉換,返回的結果為新的RDD,該方法對分區操作是一對一的。

方法源碼實現: def map[U: ClassTag](f: T =>U): RDD[U] = new MappedRDD(this, sc.clean(f))

【例3-3】map方法應用樣例

val a = sc.parallelize(List("bit", "linc", "xwc", "fjg", "wc","spark"), 3) //創建RDD val b = a.map(word => word.length) //計算每個單詞的長度 val c = a.zip(b) //拉鏈方法,把兩列數據對應配對成鍵值對格式 c.collect //把結果轉換為數組 res0: Array[(String, Int)] = Array((bit,3), (linc,4), (xwc,3), (fjg,3), (wc,2),(spark,5))

這個例子中map方法從a中依次讀入一個單詞,然后計算單詞長度,把最后計算的長度賦給b,然后因為a和b的長度相同,使用zip方法將a、b中對應元素組成K-V鍵值對形式,最后使用Action算子中的collect方法把鍵值對以數組形式輸出。


圖3-3? map方法應用樣例

2.flatMap

flapMap方法與map方法類似,但是允許在一次map方法中輸出多個對象,而不是map中的一個對象經過函數轉換生成另一個對象。

方法源碼實現:

def flatMap[U: ClassTag](f: T=> TraversableOnce[U]): RDD[U] =new FlatMappedRDD(this, sc.clean(f))

【例3-4】flatMap方法應用樣例

val a = sc.parallelize(1 to 10, 5) //生成從1到10的序列,5個分區 a.flatMap(num => 1 to num).collect //方法的作用是把每一個num映射到從1到num的序列 res47: Array[Int] = Array(1, 1, 2, 1, 2, 3, 1, 2, 3, 4, 1, 2, 3, 4, 5, 1, 2, 3, 4, 5, 6, 1, 2, 3, 4, 5, 6, 7, 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8, 9, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

這個例子先得到從1到10的序列,然后調用flatMap方法對輸入num依次生成從1到num的序列,最后使用collect方法轉換成數組輸出。

3.mapPartitions

mapPartitions是map的另一個實現。map的輸入函數是應用于RDD中每個元素,而mapPartitions的輸入函數是作用于每個分區,也就是把每個分區中的內容作為整體來處理的。

方法源碼實現:

def mapPartitions[U:ClassTag](f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean =false): RDD[U] = {val func = (context: TaskContext, index:Int, iter: Iterator[T]) => f(iter)new MapPartitionsRDD(this, sc.clean(func),preservesPartitioning)}

【例3-5】mapPartitions方法應用樣例

scala> val a = sc.parallelize(1 to 9, 3) scala> def myfunc[T](iter: Iterator[T]) : Iterator[(T, T)] = { var res = List[(T, T)]() var pre = iter.next while (iter.hasNext) { val cur = iter.next res .::= (pre, cur) pre = cur } res.iterator } scala> a.mapPartitions(myfunc).collect res3: Array[(Int, Int)] = Array((2,3), (1,2), (5,6), (4,5), (8,9), (7,8))

如圖3-4,這個例子是先得到從1到9的序列,因為有3個分區,所以每個分區數據分別是(1,2,3),(4,5,6)和(7,8,9),然后調用mapPartitions方法,因為scala是函數式編程,函數能作為參數值,所以mapPartition方法輸入參數是myfunc函數。myfunc函數的作用是先構造一個空list集合,輸入單元素集合iter,輸出雙元素Tuple集合,把分區中一個元素和它的下一個元素組成一個Tuple。因為每個分區中最后一個元素沒有下一個元素,所以(3,4)和(6,7)不在結果中。

mapPartitions還有其他的類似實現,比如mapPartitionsWithContext,它能把處理過程中的一些狀態信息傳遞給用戶指定的輸入函數,此外還有mapPartitionsWithIndex,它能把分區中的index信息傳遞給用戶指定的輸入函數,這些其他類似的實現都是基于map方法,只是細節不同,這樣做更方面使用者在不同場景下的應用。

圖3-4? mapPartitions方法應用樣例

4.mapPartitionWithIndex

mapPartitionWithIndex方法與mapPartitions方法功能類似,不同的是mapPartition-WithIndex還會對原始分區的索引進行追蹤,這樣能知道分區所對應的元素,方法的參數為一個函數,函數的輸入為整型索引和迭代器。

方法源碼實現:

def mapPartitionsWithIndex[U:ClassTag](f: (Int, Iterator[T]) => Iterator[U], preservesPartitioning:Boolean = false): RDD[U] = {val func = (context: TaskContext, index:Int, iter: Iterator[T]) => f(index, iter)new MapPartitionsRDD(this, sc.clean(func),preservesPartitioning)}

【例3-6】mapPartitionWithIndex方法應用樣例

val x = sc.parallelize(List(1,2,3,4,5,6,7,8,9,10), 3) def myfunc(index: Int, iter: Iterator[Int]) : Iterator[String] = { iter.toList.map(x => index + "," + x).iterator } x.mapPartitionsWithIndex(myfunc).collect() res10: Array[String] = Array(0,1, 0,2, 0,3, 1,4, 1,5, 1,6, 2,7, 2,8, 2,9, 2,10)

這個例子中先得到一個名為的x序列,然后調用mapPartitionsWithIndex方法,參數為myfunc函數,這個函數的實現是把輸入經過map方法映射為分區索引加值的形式。結果中的0,1表示分區下標0和第一個輸入值1,后面依次輸出其他分區和對應的值,說明分區數是從下標0開始的。

5.foreach

foreach方法主要是對輸入的數據對象執行循環操作,該方法常用來輸出RDD中的內容。

方法源碼實現:

def foreach(f: T => Unit) {val cleanF = sc.clean(f)sc.runJob(this, (iter: Iterator[T]) =>iter.foreach(cleanF))}

【例3-7】foreach方法應用樣例

val c = sc.parallelize(List("xwc", "fjg", "wc", "dcp", "zq", "snn", "mk", "zl", "hk", "lp"), 3) c.foreach(x => println(x + " are from BIT")) xwc are from BIT fjg are from BIT wc are from BIT dcp are from BIT zq are from BIT ssn are from BIT mk are from BIT zl are from BIT hk are from BIT lp are from BIT

這個方法比較直觀,直接對c變量中的每一個元素對象使用println函數,打印對象內容。

6.foreachPartition

foreachPartition方法的作用是通過迭代器參數對RDD中每一個分區的數據對象應用函數。mapPartitions方法的作用于foreachPartition方法作用非常相似,區別就在于使用的參數是否有返回值。

方法源碼實現:

def foreachPartition(f:Iterator[T] => Unit) {val cleanF = sc.clean(f)sc.runJob(this, (iter: Iterator[T]) =>cleanF(iter))}

【例3-8】foreachPartition方法應用樣例

val b = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9), 3) b.foreachPartition(x => println((a,b) => x.reduce(a + b))) 6 15 24

這個例子是將序列b中的每一個元素進行reduce操作,對每個分區中輸入的每一個元素累加,例如對于分區0,輸入1和2相加等于3,然后把上個結果3與下一個輸入3相加就等于6,其他分區的運算與該分區一樣。

7.glom

作用類似collect,但它不知直接將所有RDD直接轉化為數組形式,glom方法的作用是將RDD中分區數據進行組裝到數組類型RDD中,每一個返回的數組包含一個分區的元素,按分區轉化為數組,最后有幾個分區就返回幾個數組類型的RDD。

方法源碼實現:

def glom(): RDD[Array[T]] = newGlommedRDD(this)private[spark] classGlommedRDD[T: ClassTag](prev: RDD[T])extends RDD[Array[T]](prev) {overridedef getPartitions: Array[Partition] = firstParent[T].partitionsoverridedef compute(split: Partition, context: TaskContext) =Array(firstParent[T].iterator(split,context).toArray).iterator}

【例3-9】glom方法應用樣例

val a = sc.parallelize(1 to 99, 3) a.glom.collect res5: Array[Array[Int]] = Array(Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33), Array(34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66), Array(67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99))

這個例子很簡潔,在執行glom方法后就調用collect方法獲得Array數組并輸出,可以看出a.glom方法輸出的是三個數組組成的RDD,其中每個數組代表一個分區數據。

8.union

union方法(等價于“++”)是將兩個RDD取并集,取并集過程中不會把相同元素去掉。union操作是輸入分區與輸出分區多對一模式,如圖所示。

方法源碼實現:

def union(other: RDD[T]):RDD[T] = new UnionRDD(sc, Array(this, other))class UnionRDD[T: ClassTag](sc: SparkContext,var rdds: Seq[RDD[T]])extends RDD[T](sc, Nil) {overridedef getPartitions: Array[Partition] = {val array = newArray[Partition](rdds.map(_.partitions.size).sum)var pos = 0for ((rdd, rddIndex) <- rdds.zipWithIndex;split <- rdd.partitions) {array(pos) = new UnionPartition(pos,rdd, rddIndex, split.index)pos += 1}array}overridedef getDependencies: Seq[Dependency[_]] = {valdeps = new ArrayBuffer[Dependency[_]]varpos = 0for(rdd <- rdds) {deps += new RangeDependency(rdd, 0, pos,rdd.partitions.size)pos += rdd.partitions.size}deps}override def compute(s: Partition, context:TaskContext): Iterator[T] = {valpart = s.asInstanceOf[UnionPartition[T]]parent[T](part.parentRddIndex).iterator(part.parentPartition,context)}overridedef getPreferredLocations(s: Partition): Seq[String] =s.asInstanceOf[UnionPartition[T]].preferredLocations()overridedef clearDependencies() {super.clearDependencies()rdds= null}}

【例3-10】union方法應用樣例

val a = sc.parallelize(1 to 4, 2) val b = sc.parallelize(2 to 4, 1) (a ++ b).collect res4: Array[Int] = Array(1, 2, 3, 4, 2, 3, 4)

如圖3-5可見,這個例子先創建2個RDD變量a和b,然后對a與b使用union方法,返回兩個RDD并集的結果。

圖3-5? union方法應用樣例

9.cartesian

計算兩個RDD中每個對象的笛卡爾積(例如第一個RDD中的每一個對象與第二個RDD中的對象join連接),但使用該方法時要注意可能出現內存不夠的情況。

方法源碼實現:

def cartesian[U:ClassTag](other: RDD[U]): RDD[(T, U)] = new CartesianRDD(sc, this, other)class CartesianRDD[T: ClassTag,U: ClassTag](sc: SparkContext,var rdd1 : RDD[T],var rdd2 : RDD[U])extends RDD[Pair[T, U]](sc, Nil)with Serializable {valnumPartitionsInRdd2 = rdd2.partitions.sizeoverridedef getPartitions: Array[Partition] = {//create the cross product split val array = newArray[Partition](rdd1.partitions.size * rdd2.partitions.size)for(s1 <- rdd1.partitions; s2 <- rdd2.partitions) {val idx = s1.index * numPartitionsInRdd2+ s2.indexarray(idx) = new CartesianPartition(idx,rdd1, rdd2, s1.index, s2.index)}array}overridedef getPreferredLocations(split: Partition): Seq[String] = {valcurrSplit = split.asInstanceOf[CartesianPartition](rdd1.preferredLocations(currSplit.s1)++ rdd2.preferredLocations(currSplit.s2)).distinct}overridedef compute(split: Partition, context: TaskContext) = {valcurrSplit = split.asInstanceOf[CartesianPartition]for (x <- rdd1.iterator(currSplit.s1,context);y <- rdd2.iterator(currSplit.s2,context)) yield (x, y)}overridedef getDependencies: Seq[Dependency[_]] = List(new NarrowDependency(rdd1) {def getParents(id: Int): Seq[Int] = List(id/ numPartitionsInRdd2)},new NarrowDependency(rdd2) {def getParents(id: Int): Seq[Int] =List(id % numPartitionsInRdd2)})override def clearDependencies() {super.clearDependencies()rdd1 = nullrdd2 = null}}

【例3-11】cartesian方法應用樣例

val x =sc.parallelize(List(1,2,3),1) val y =sc.parallelize(List(4,5),1) x.cartesian(y).collect res0: Array[(Int, Int)] =Array((1,4),(1,5),(2,4),(2,5),(3,4),(3,5))

例子中x是第一個RDD,其中的每個元素都跟y中元素進行連接,如果第一個RDD有m個元素,第二個RDD中元素n個,則求笛卡爾積后總元素為m×n個,本例結果為6個,如圖3-6所示。

圖3-6? cartesian方法應用樣例

10.groupBy

groupBy方法有三個重載方法,功能是將元素通過map函數生成Key-Value格式,然后使用reduceByKey方法對Key-Value對進行聚合,具體可參考源碼實現。

方法源碼實現:

def groupBy[K](f: T => K, p:Partitioner)(implicit kt: ClassTag[K], ord: Ordering[K] = null): RDD[(K, Iterable[T])] = {valcleanF = sc.clean(f) //對用戶函數預處理this.map(t=> (cleanF(t), t)).groupByKey(p) //對數據進行map操作,生成Key-Value對,再聚合 }def groupBy[K](f: T =>K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])] =groupBy[K](f, defaultPartitioner(this)) //使用默認分區器 def groupBy[K](f: T => K,numPartitions: Int)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])] =groupBy(f, new HashPartitioner(numPartitions)) //使用hash分區器,分區數自定義

【例3-12】groupBy方法應用樣例

(1)val a = sc.parallelize(1 to 9, 3) a.groupBy(x => { if (x % 2 == 0) "even" else "odd" }).collect res42: Array[(String, Seq[Int])] = Array((even,ArrayBuffer(2, 4, 6, 8)), (odd,ArrayBuffer(1, 3, 5, 7, 9))) (2)val a = sc.parallelize(1 to 9, 3) def myfunc(a: Int) : Int = { a % 2 } a.groupBy(myfunc).collect res3: Array[(Int, Seq[Int])] = Array((0,ArrayBuffer(2, 4, 6, 8)), (1,ArrayBuffer(1, 3, 5, 7, 9))) (3)val a = sc.parallelize(1 to 9, 3) def myfunc(a: Int) : Int = { a % 2 } a.groupBy(myfunc(_), 1).collect res7: Array[(Int, Seq[Int])] = Array((0,ArrayBuffer(2, 4, 6, 8)), (1,ArrayBuffer(1, 3, 5, 7, 9)))

第一個例子中是單個參數,調用groupBy方法,結果集的key只有兩種,即even和odd,然后對相同的key進行聚合得到最終結果。第二個例子和第三個例子本質一樣,只是使用的重載方法不同。

11.filter

filter方法通過名稱就能猜出來功能,其實就是對輸入元素進行過濾,參數是一個返回值為boolean的函數,如果函數對輸入元素運算結果為true,則通過該元素,否則將該元素過濾,不能進入結果集。

方法源碼實現:

def filter(f: T => Boolean):RDD[T] = new FilteredRDD(this, sc.clean(f))

【例3-13】filter方法應用樣例

(1)val a = sc.parallelize(1 to 10, 3) val b = a.filter(x => x % 3 == 0) b.collect res3: Array[Int] = Array(3, 6, 9) (2)val b = sc.parallelize(1 to 8) b.filter(x => x < 4).collect res15: Array[Int] = Array(1, 2, 3) (3)val a = sc.parallelize(List("cat", "horse", 4.0, 3.5, 2, "dog")) a.filter(_ < 4).collect <console>:15: error: value < is not a member of Any

第一個和第二個例子比較好理解,因為a中元素都是整型,可以順利進行比較,但第三個例子會報錯,因為a中有部分對象不能與整數比較,如果使用scala中的偏函數就可以解決混合數據類型的問題。

12.distinct

將RDD中重復的元素去掉,只留下唯一的RDD元素。

方法源碼實現:

def distinct(): RDD[T] =distinct(partitions.size)def distinct(numPartitions:Int)(implicit ord: Ordering[T] = null): RDD[T] =map(x => (x,null)).reduceByKey((x, y) => x, numPartitions).map(_._1)

【例3-14】distinct方法應用樣例

(1)val c = sc.parallelize(List("Gnu", "Cat", "Rat", "Dog", "Gnu", "Rat"), 2) c.distinct.collect res6: Array[String] = Array(Dog, Gnu, Cat, Rat) (2)val a = sc.parallelize(List(1,2,3,4,5,6,7,8,9,10)) a.distinct(2).partitions.length res16: Int = 23)a.distinct(3).partitions.length res17: Int = 3

這個例子就是把RDD中的元素map為Key-Value對形式,然后使用reduceByKey將重復Key合并,也就是把重復元素刪除,只留下唯一的元素。此外distinct有一個重載方法需要一個參數,這個參數就是分區數numPartitions,從例子中看出使用帶參的distinct方法不僅能刪除重復元素,而且還能對結果重新分區。

13.subtract

subtract的含義就是求集合A-B的差,即把集合A中包含集合B的元素都刪除,結果是剩下的元素。

方法源碼實現:

def subtract(other: RDD[T], p:Partitioner)(implicit ord: Ordering[T] = null): RDD[T] = {if (partitioner == Some(p)) {val p2 = newPartitioner() {override def numPartitions = p.numPartitionsoverridedef getPartition(k: Any) = p.getPartition(k.asInstanceOf[(Any, _)]._1)}// Unfortunately, since we're making a new p2,we'll get ShuffleDependencies//anyway, and when calling .keys, will not have a partitioner set, even though// the SubtractedRDD will, thanks to p2'sde-tupled partitioning, already be//partitioned by the right/real keys (e.g. p).this.map(x=> (x, null)).subtractByKey(other.map((_, null)), p2).keys} else {this.map(x=> (x, null)).subtractByKey(other.map((_, null)), p).keys}}

【例3-15】subtract方法應用樣例

val a = sc.parallelize(1 to 9, 3) val b = sc.parallelize(1 to 3, 3) val c = a.subtract(b) c.collect res3: Array[Int] = Array(6, 9, 4, 7, 5, 8)

這個例子就是把a中包含b中的元素都刪除掉,底層實現使用subtractByKey,也就是根據鍵值對中的Key來刪除a包含的b元素。

14.persist,cache

cache方法顧名思義,是緩存數據,其作用是把RDD緩存到內存中,以方便下一次計算被再次調用。

方法源碼實現:

def cache(): this.type = persist()

【例3-16】cache方法應用樣例

val c = sc.parallelize(List("a", "b", "c", "d", "e", "f"),1) c.cache res11: c.type = ParallelCollectionRDD[10] at parallelize at <console>:21

這個例子就是直接把RDD緩存在內存中。

15.persist

persist方法的作用是把RDD根據不同的級別進行持久化,通過使用帶參數方法能指定持久化級別,如果不帶參數則為默認持久化級別,即只保存到內存,與cache等價。

【例3-17】persist方法應用樣例

val a = sc.parallelize(1 to 9, 3) a.persist(StorageLevel.MEMORY_ONLY)

這個例子使用persist方法,指定持久化級別為MEMORY_ONLY,該級別等價于cache方法。

16.sample

sample的作用是隨機的對RDD中的元素采樣,獲得一個新的子集RDD,根據參數能指定是否又放回采樣、子集占總數的百分比和隨機種子。

方法源碼實現:

def sample(withReplacement:Boolean,fraction: Double,seed: Long = Utils.random.nextLong): RDD[T] = {require(fraction >= 0.0, "Negativefraction value: " + fraction)if (withReplacement) {new PartitionwiseSampledRDD[T, T](this,new PoissonSampler[T](fraction), true, seed)} else {new PartitionwiseSampledRDD[T, T](this,new BernoulliSampler[T](fraction), true, seed)}}

【例3-18】sample方法應用樣例

(1)val a = sc.parallelize(1 to 1000, 2) a.sample(false, 0.1, 0).collect res4: Array[Int] = Array(3, 21, 22, 27, 48, 50, 57, 80, 88, 90, 97, 113, 126, 130, 135, 145, 162, 169, 182, 230, 237, 242, 267, 271, 287, 294, 302, 305, 324, 326, 330, 351, 361, 378, 383, 384, 409, 412, 418, 432, 433, 485, 493, 497, 502, 512, 514, 521, 522, 531, 536, 573, 585, 595, 615, 617, 629, 640, 642, 647, 651, 664, 671, 673, 684, 692, 707, 716, 718, 721, 723, 736, 738, 756, 759, 788, 799, 827, 828, 833, 872, 898, 899, 904, 915, 916, 919, 927, 929, 951, 969, 980) (2)val a = sc.parallelize(1 to 100, 2) a.sample(true, 0.3, 0).collect res5: Array[Int] = Array(1, 1, 9, 18, 18, 24, 26, 29, 32, 34, 37, 38, 42, 43, 45, 51, 54, 56, 60, 65, 67, 70, 73, 74, 74, 75, 85, 86, 95, 99)

上述例子中第一個參數withReplacement為true時使用放回抽樣(泊松抽樣[1]),為false時使用不放回抽樣(伯努利抽樣),第二個參數fraction是百分比,第三個參數seed是種子,也就是隨機取值的起源數字。從例子中還看出當選擇放回抽樣時,取出的元素中會出現重復值。


3.5.2 鍵值對型Transformation算子

RDD的操作算子除了單值型還有鍵值對(Key-Value)型。這里開始介紹鍵值對型的算子,主要包括groupByKey、combineByKey、reduceByKey、sortByKey、cogroup和join,如表3-5所示。

表3-5?鍵值對型Transformation算子

方法名

方法定義

groupByKey

def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])]

combineByKey

def combineByKey[C](createCombiner: V => C, mergeValue: (C,V) => C, mergeCombiners: (C, C) => C) : RDD[(K, C)]

reduceByKey

def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)]

sortByKey

def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.size): RDD[P]

cogroup

def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner): RDD

join

def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))]

?

1.groupByKey

類似groupBy方法,作用是把每一個相同Key值的的Value聚集起來形成一個序列,可以使用默認分區器和自定義分區器,但是這個方法開銷比較大,如果想對同一Key進行Value的聚合或求平均,則推薦使用aggregateByKey或者reduceByKey。

方法源碼實現:

def groupByKey(numPartitions:Int): RDD[(K, Iterable[V])] = {groupByKey(new HashPartitioner(numPartitions))}def groupByKey(partitioner:Partitioner): RDD[(K, Iterable[V])] = {//groupByKey不應該使用map端的combine操作,因為map端并不會減少shuffle的數據,還要求//所有map端的數據都插入hash表中,導致很多對象進入內存中的老年代。 val createCombiner = (v: V) =>CompactBuffer(v)val mergeValue = (buf: CompactBuffer[V], v:V) => buf += vval mergeCombiners = (c1: CompactBuffer[V],c2: CompactBuffer[V]) => c1 ++= c2val bufs = combineByKey[CompactBuffer[V]](createCombiner, mergeValue,mergeCombiners, partitioner, mapSideCombine=false)bufs.asInstanceOf[RDD[(K, Iterable[V])]]}

【例3-19】groupByKey方法應用樣例

val a = sc.parallelize(List("mk", "zq", "xwc", "fjg", "dcp", "snn"), 2) val b = a.keyBy(x => x.length) // keyBy方法調用map(x => (f(x),x))生成鍵值對 b.groupByKey.collect res6: Array[(Int, Iterable[String])] = Array((2,CompactBuffer(mk, zq)), (3,CompactBuffer(xwc, fjg, dcp, snn)))

這個例子先創建包含List集合對象的RDD,然后使用keyBy方法生成Key-Value鍵值對,然后調用groupByKey方法將相同Key的Value聚合,最后調用collect方法以數組形式輸出。

?

圖3-7? groupByKey方法應用樣例

2.combineByKey

combineByKey方法能高效的將鍵值對形式的RDD按相同的Key把Value合并成序列形式,用戶能自定義RDD的分區器和是否在map端進行聚合操作。

方法源碼實現:

defcombineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C,mergeCombiners: (C, C) => C): RDD[(K, C)] = {combineByKey(createCombiner, mergeValue,mergeCombiners, defaultPartitioner(self))}defcombineByKey[C](createCombiner: V => C,mergeValue: (C, V) => C, //輸入2個不同類型參數,返回其中一個類型參數 mergeCombiners: (C, C) => C, //輸入2個同類型參數,返回一個參數 numPartitions: Int): RDD[(K, C)] = {combineByKey(createCombiner,mergeValue, mergeCombiners, new HashPartitioner(numPartitions))}def combineByKey[C](createCombiner:V => C,mergeValue: (C, V) => C,mergeCombiners: (C, C) => C,partitioner: Partitioner,mapSideCombine: Boolean = true,serializer: Serializer = null): RDD[(K, C)]= {require(mergeCombiners!= null, "mergeCombiners must be defined") // required as of Spark0.9.0if(keyClass.isArray) {if(mapSideCombine) {throw new SparkException("Cannot usemap-side combining with array keys.")}if(partitioner.isInstanceOf[HashPartitioner]) {throw new SparkException("Defaultpartitioner cannot partition array keys.")}}val aggregator = new Aggregator[K, V, C](self.context.clean(createCombiner),self.context.clean(mergeValue),self.context.clean(mergeCombiners))if (self.partitioner == Some(partitioner)){self.mapPartitions(iter => {val context = TaskContext.get()new InterruptibleIterator(context,aggregator.combineValuesByKey(iter, context))}, preservesPartitioning = true)} else {new ShuffledRDD[K, V, C](self,partitioner).setSerializer(serializer).setAggregator(aggregator).setMapSideCombine(mapSideCombine)}}

【例3-20】combineByKey方法應用樣例

val a = sc.parallelize(List("xwc", "fjg", "wc", "dcp", "zq", "snn", "mk", "zl", "hk", "lp"), 2) val b = sc.parallelize(List(1,2,2,3,2,1,2,2,2,3), 2) val c = b.zip(a) //把a和b中對應元素組合成鍵值對,如Array((1,xwc), (3,fjg), (2,wc), (3,dcp)... val d = c.combineByKey(List(_), (x:List[String], y:String) => y :: x, (x:List[String], y:List[String] = x ::: y) d.collect res13: Array[(Int, List[String])] = Array((2,List(zq, wc, fjg, hk, zl, mk)), (1,List(xwc, snn)), (3,List(dcp, lp)))

在使用zip方法得到鍵值對序列c后調用combineByKey,把相同Key的value進行合并到List中。這個例子中使用三個參數的重載方法,該方法第一個參數是createCombiner,作用是把元素V轉換到另一類型元素C,該例子中使用的是List(_),表示將輸入的元素放在list集合中;mergeValue的含義是把元素V合并到元素C中,在該例子中使用的是函數是x:List[String],y:String) => y :: x,表示將y字符串合并到x鏈表集合中;mergeCombiners含義是將兩個C元素合并,在該例子中使用的是x:List[String], y:List[String]= x ::: y,表示把x鏈表集合中的內容合并到y鏈表集合中。

3.reduceByKey

使用一個reduce函數來實現對相同Key的Value的聚集操作,在發送結果給reduce前會在map端的執行本地merge操作。該方法的底層實現就是調用combineByKey方法的一個重載方法。

方法源碼實現:

def reduceByKey(partitioner:Partitioner, func: (V, V) => V): RDD[(K, V)] = {combineByKey[V]((v: V) => v, func, func,partitioner)}def reduceByKey(func: (V, V)=> V, numPartitions: Int): RDD[(K, V)] = {reduceByKey(newHashPartitioner(numPartitions), func)}def reduceByKey(func: (V, V)=> V): RDD[(K, V)] = {reduceByKey(defaultPartitioner(self), func)}

【例3-21】reduceByKey方法應用樣例

(1)val a = sc.parallelize(List("dcp", "fjg", "snn", "wc", "zq"), 2) val b = a.map(x => (x.length, x)) b.reduceByKey((a,b) => a + b).collect res22: Array[(Int, String)] = Array((2,wczq), (3,dcpfjgsnn)) (2)val a = sc.parallelize(List(3,12,124,32,5 ), 2) val b = a.map(x => (x.toString.length, x)) b.reduceByKey(_ + _).collect res24: Array[(Int, Int)] = Array((2,44), (1,8), (3,124))

這個例子先用map方法映射出鍵值對,然后調用reduceByKey方法對相同Key的Value值進行累加。例子中第一個是使用字符串,故使用聚合相加后是字符串的合并;第二個例子使用的是數字,結果是對應Key的Value數字相加。

4.sortByKey

這個函數會根據Key值對鍵值對進行排序,如果Key是字母,則按字典順序排序,如果Key是數字,則從小到大排序(或從大到小),該方法的第一個參數控制是否為升序排序,當為true時是升序,反之為降序。

方法源碼實現:

def sortByKey(ascending: Boolean = true,numPartitions: Int = self.partitions.size) : RDD[(K, V)] ={val part = new RangePartitioner(numPartitions,self, ascending)new ShuffledRDD[K, V, V](self, part).setKeyOrdering(if (ascending) orderingelse ordering.reverse)}

【例3-22】sortByKey方法應用樣例

val a = sc.parallelize(List("dog", "cat", "owl", "gnu", "ant"), 2) val b = sc.parallelize(1 to a.count.toInt, 2) //a.count得到單詞的字母個數 val c = a.zip(b) c.sortByKey(true).collect res74: Array[(String, Int)] = Array((ant,5), (cat,2), (dog,1), (gnu,4), (owl,3)) c.sortByKey(false).collect res75: Array[(String, Int)] = Array((owl,3), (gnu,4), (dog,1), (cat,2), (ant,5))

這個例子先通過zip方法得到包含鍵值對的變量c,然后演示了sortByKey方法中參數為true和false時的計算結果。本例中的key是字符串,故可以看出當Key為true時,結果是按Key的字典順序升序輸出,反之則為降序輸出結果;當key為數字的時候,則按大小排列。

5.cogroup

cogroup是一個比較高效的函數,能根據Key值聚集最多3個鍵值對的RDD,把相同Key值對應的Value聚集起來。

方法源碼實現:

//參數為一個RDD情況 def cogroup[W](other: RDD[(K,W)], partitioner: Partitioner): RDD[(K, (Iterable[V], Iterable[W]))] = {if(partitioner.isInstanceOf[HashPartitioner] && keyClass.isArray) {throw new SparkException("Defaultpartitioner cannot partition array keys.")}val cg = new CoGroupedRDD[K](Seq(self,other), partitioner)cg.mapValues { case Array(vs, w1s) =>(vs.asInstanceOf[Iterable[V]],w1s.asInstanceOf[Iterable[W]])}}//參數為兩個RDD情況 def cogroup[W1, W2](other1:RDD[(K, W1)], other2: RDD[(K, W2)], partitioner: Partitioner): RDD[(K,(Iterable[V], Iterable[W1], Iterable[W2]))] = {if(partitioner.isInstanceOf[HashPartitioner] && keyClass.isArray) {throw new SparkException("Defaultpartitioner cannot partition array keys.")}val cg = new CoGroupedRDD[K](Seq(self,other1, other2), partitioner)cg.mapValues { case Array(vs, w1s, w2s)=>(vs.asInstanceOf[Iterable[V]],w1s.asInstanceOf[Iterable[W1]],w2s.asInstanceOf[Iterable[W2]])}}//參數為3個RDD情況 def cogroup[W1, W2, W3](other1:RDD[(K, W1)],other2: RDD[(K, W2)],other3: RDD[(K, W3)],partitioner: Partitioner): RDD[(K, (Iterable[V], Iterable[W1],Iterable[W2], Iterable[W3]))] = {if (partitioner.isInstanceOf[HashPartitioner]&& keyClass.isArray) {throw new SparkException("Defaultpartitioner cannot partition array keys.")}val cg = new CoGroupedRDD[K](Seq(self,other1, other2, other3), partitioner)cg.mapValues { case Array(vs, w1s, w2s, w3s)=>(vs.asInstanceOf[Iterable[V]],w1s.asInstanceOf[Iterable[W1]],w2s.asInstanceOf[Iterable[W2]],w3s.asInstanceOf[Iterable[W3]])}}

【例3-23】cogroup方法應用樣例

(1)val a =sc.parallelize(List(1,2,2 ,3, 1, 3), 1) val b =a.map(x => (x, "b")) val c =a.map(y => (y, "c")) b.cogroup(c).collect res25:Array[(Int, (Iterable[String], Iterable[String]))] = Array((1,(CompactBuffer(b,b), CompactBuffer(c,c))), (3,(CompactBuffer(b, b),CompactBuffer(c, c))), (2,(CompactBuffer(b, b),CompactBuffer(c, c)))) (2)val d = a.map(m => (m,"x")) b.cogroup(c,d).collect res26:Array[(Int, (Iterable[String], Iterable[String], Iterable[String]))] =Array((1,(CompactBuffer(b,b) ,CompactBuffer(c,c),CompactBuffer(x, x))), (3,(CompactBuffer(b, b),CompactBuffer(c, c), CompactBuffer(x,x))), (2,(CompactBuffer(b, b),CompactBuffer(c, c),CompactBuffer(x, x))))

例子中有兩個個小例子,依次是單個參數和兩個參數的情況,使用cogroup方法對單個RDD和2個RDD進行聚集操作。

6.join

對鍵值對的RDD進行cogroup操作,然后對每個新的RDD下Key的值進行笛卡爾積操作,再對返回結果使用flatMapValues方法,最后返回結果。

方法源碼實現:

def join[W](other: RDD[(K, W)],partitioner: Partitioner): RDD[(K, (V, W))] = {this.cogroup(other,partitioner).flatMapValues( pair =>for (v <- pair._1.iterator; w <-pair._2.iterator) yield (v, w))}

【例3-24】join方法應用樣例

val a = sc.parallelize(List("fjg", "wc", "xwc","dcp"), 2) val b = a.keyBy(_.length) //得到諸如(3,"fjg"),(2,"wc")的鍵值對序列 val c = sc.parallelize(List("fjg", "wc", "snn", "zq", "xwc","dcp"), 2) val d = c.keyBy(_.length) b.join(d).collect res29: Array[(Int, (String, String))] = Array((2,(wc,wc)), (2,(wc,zq)), (3,(fjg,fjg)), (3,(fjg,snn)), (3,(fjg,xwc)), (3,(fjg,dcp)), (3,(xwc,fjg)), (3,(xwc,snn)), (3,(xwc,xwc)), (3,(xwc,dcp)), (3,(dcp,fjg)), (3,(dcp,snn)), (3,(dcp,xwc)), (3,(dcp,dcp)))

這個例子先構造兩個包含鍵值對元素的變量b和d,然后調用join方法,得到join后的結果,根據源碼實現,join方法本質是cogroup方法和flatMapValues方法的組合,其中cogroup方法得到聚合值,flatMapValues方法實現的是笛卡爾積,笛卡爾積的過程是在各個分區內進行,如例子中的Key等于2分區,wc與(wc,zq)求笛卡爾積,得到(2,(wc,wc))和(2,(wc,zq))的結果。

?

圖3-8? join方法應用樣例

3.5.3 Action算子

當Spark的計算模型中出現Action算子時才會執行提交作業的runJob動作,這時會觸發后續的DAGScheduler和TaskScheduler工作。這里主要講解常用的Action算子,有collect、reduce、take、top、count、takeSample、saveAsTextFile、countByKey、aggregate,具體方法和定義如表3-6所示。

表3-6? Action算子

方法名

方法定義

collect

def collect(): Array[T]

reduce

def reduce(f: (T, T) => T): T

take

def take(num: Int): Array[T]

top

def top(num: Int)(implicit ord: Ordering[T]): Array[T]

count

def count(): Long

takeSample

def takeSample(withReplacement: Boolean,num: Int,seed: Long = Utils.random.nextLong): Array[T]

saveAsTextFile

def saveAsTextFile(path: String)

countByKey

def countByKey(): Map[K, Long]

aggregate

def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U

1.collect

collect方法的作用是把RDD中的元素以數組的方式返回。

方法源碼實現:

def collect(): Array[T] = {val results = sc.runJob(this, (iter:Iterator[T]) => iter.toArray)Array.concat(results: _*)}

【例3-25】collect方法應用樣例

val c = sc.parallelize(List("a", "b", "c", "d", "e", "f"), 2) c.collect res29: Array[String] = Array(a, b, c, d, e, f)

這個例子直接把RDD中的元素轉換成數組返回。

2.reduce

reduce方法使用一個帶兩個參數的函數把元素進行聚集,返回一個元素結果,注意該函數中的二元操作應該滿足交換律和結合律,這樣才能在并行系統中正確計算。

方法源碼實現:

def reduce(f: (T, T) => T): T = { //輸入是兩個參數的函數,返回一個值 val cleanF = sc.clean(f)val reducePartition: Iterator[T] =>Option[T] = iter => {if (iter.hasNext) {Some(iter.reduceLeft(cleanF))} else {None}}var jobResult: Option[T] = Noneval mergeResult = (index: Int, taskResult:Option[T]) => {if (taskResult.isDefined) {jobResult = jobResult match {case Some(value) => Some(f(value,taskResult.get))case None => taskResult}}}sc.runJob(this, reducePartition, mergeResult)//獲得Option的最后結果,或者當RDD為空時拋出異常 jobResult.getOrElse(throw newUnsupportedOperationException("empty collection"))}

【例3-26】reduce方法應用樣例

val a = sc.parallelize(1 to 10) a.reduce((a,b)=> a + b) res41: Int = 55

這個例子使用簡單的函數將輸入的元素相加,過程是先輸入前兩個元素相加,然后將得到的結果與下一個輸入元素相加,依次規則計算出所有元素的和。

3.take

take方法會從RDD中取出前n[1]個元素。方法是先掃描一個分區并后從分區中得到結果,然后評估得到的結果是否達到取出元素個數,如果沒達到則繼續從其他分區中掃描獲取。

方法源碼實現:

def take(num: Int): Array[T] = {if (num == 0) {return new Array[T](0)}val buf = new ArrayBuffer[T]val totalParts = this.partitions.lengthvar partsScanned = 0while (buf.size < num &&partsScanned < totalParts) {// numPartsToTry表示在這個迭代中嘗試的分區數,這個數可以比總分區數大,因為在runJob中的總分區會限定它的值。 var numPartsToTry = 1if (partsScanned > 0) {//如果沒有在之前的迭代中找到記錄,則會重復尋找(次數翻四倍),此外還會調整分 區數,最多調整漲幅不超過50%if (buf.size == 0) {numPartsToTry = partsScanned * 4} else {// the left side of max is >=1whenever partsScanned >= 2 numPartsToTry = Math.max((1.5 * num *partsScanned / buf.size).toInt - partsScanned, 1)numPartsToTry =Math.min(numPartsToTry, partsScanned * 4)}}val left = num - buf.sizeval p = partsScanned untilmath.min(partsScanned + numPartsToTry, totalParts)val res = sc.runJob(this, (it:Iterator[T]) => it.take(left).toArray, p, allowLocal = true)res.foreach(buf ++= _.take(num -buf.size))partsScanned += numPartsToTry}buf.toArray}

【例3-27】take方法應用樣例

(1) val b = sc.parallelize(List("a", "b", "c", "d", "e"), 2) b.take(2) res18: Array[String] = Array(a, b) (2) val b = sc.parallelize(1 to 100, 5) b.take(30) res6: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21,22, 23, 24, 25, 26, 27, 28, 29, 30)

這里例子分別演示了字母和數字情況,其實工作原理都相同,即從分區中按先后順序拿元素出來。

4.top

top方法會利用隱式排序轉換方法(見實現源碼中implicit修飾的方法)來獲取最大的前n個元素。

方法源碼實現:

def top(num: Int)(implicit ord:Ordering[T]): Array[T] = takeOrdered(num)(ord.reverse)def takeOrdered(num:Int)(implicit ord: Ordering[T]): Array[T] = {if (num == 0) {Array.empty} else {val mapRDDs = mapPartitions { items =>// Priority keeps the largest elements,so let's reverse the ordering. val queue = newBoundedPriorityQueue[T](num)(ord.reverse)queue ++=util.collection.Utils.takeOrdered(items, num)(ord)Iterator.single(queue)}if (mapRDDs.partitions.size == 0) {Array.empty} else {mapRDDs.reduce { (queue1, queue2) =>queue1 ++= queue2queue1}.toArray.sorted(ord) }}}

【例3-28】top方法應用樣例

val c = sc.parallelize(Array(1, 3, 2,4, 9, 2,11,5), 3) c.top(3) res10:Array[Int] = Array(11, 9, 5)

例子顯示了top的使用方法,很簡潔,直接輸入元素個數作為參數就能得到前n個元素的值。

5.count

count方法計算并返回RDD中元素的個數。

方法源碼實現:

def count(): Long =sc.runJob(this, Utils.getIteratorSize _).sumdef runJob[T, U: ClassTag](rdd:RDD[T], func: Iterator[T] => U): Array[U] = {runJob(rdd, func, 0 until rdd.partitions.size,false)}

【例3-29】count方法應用樣例

val c = sc.parallelize(Array(1,3, 2,4, 9, 2,11,5), 2) c.count res3: Long = 8

6.takeSample

takeSample方法返回一個固定大小的數組形式的采樣子集,此外還把返回的元素順序隨機打亂,方法的三個參數含義依次是否放回數據、返回取樣的大小和隨機數生成器的種子。

方法源碼實現:

def takeSample(withReplacement:Boolean,num: Int,seed: Long = Utils.random.nextLong):Array[T] = {val numStDev = 10.0if (num < 0) {throw newIllegalArgumentException("Negative number of elements requested")} else if (num == 0) {return new Array[T](0)}val initialCount = this.count()if (initialCount == 0) {return new Array[T](0)}val maxSampleSize = Int.MaxValue -(numStDev * math.sqrt(Int.MaxValue)).toIntif (num > maxSampleSize) {throw new IllegalArgumentException("Cannotsupport a sample size > Int.MaxValue - " +s"$numStDev *math.sqrt(Int.MaxValue)")}val rand = new Random(seed)if (!withReplacement && num >=initialCount) {returnUtils.randomizeInPlace(this.collect(), rand)}valfraction = SamplingUtils.computeFractionForSampleSize(num, initialCount,withReplacement)var samples = this.sample(withReplacement,fraction, rand.nextInt()).collect()//如果采樣容量不夠大,則繼續采樣 var numIters = 0while (samples.length < num) {logWarning(s"Needed to re-sample dueto insufficient sample size. Repeat #$numIters")samples = this.sample(withReplacement,fraction, rand.nextInt()).collect()numIters += 1}Utils.randomizeInPlace(samples,rand).take(num)}

【例3-30】takeSample方法應用樣例

val x = sc.parallelize(1 to 100, 2) x.takeSample(true, 30, 1) res13: Array[Int] = Array(72, 37, 96, 47, 40, 96, 57, 100, 8, 44, 82, 11, 32, 47, 99, 94, 37, 97,52, 41, 100, 78, 93, 11, 6, 100, 75, 14, 47, 16)

這個例子直接使用takeSample方法,得到30個固定數字的樣本,采取有放回抽樣的方式。

7.saveAsTextFile

把RDD存儲為文本文件,一次存一行。

方法源碼實現:

def saveAsTextFile(path:String) { val nullWritableClassTag =implicitly[ClassTag[NullWritable]]val textClassTag =implicitly[ClassTag[Text]]val r = this.map(x =>(NullWritable.get(), new Text(x.toString)))rddToPairRDDFunctions(r)(nullWritableClassTag, textClassTag, null).saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path)}def saveAsTextFile(path:String, codec: Class[_ <: CompressionCodec]) { //參數可選擇壓縮方式// 參考https://issues.apache.org/jira/browse/SPARK-2075 val nullWritableClassTag =implicitly[ClassTag[NullWritable]]val textClassTag =implicitly[ClassTag[Text]]val r = this.map(x =>(NullWritable.get(), new Text(x.toString)))rddToPairRDDFunctions(r)(nullWritableClassTag, textClassTag, null).saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path, codec)}

【例3-31】saveAsTextFile方法應用樣例

val a = sc.parallelize(1 to 100, 3) a.saveAsTextFile("BIT_Spark") //控制臺打印出的部分日志 15/08/04 10:27:58 INFO FileOutputCommitter: Saved output of task 'attempt_201508041027_0001 _m_000002_5' to file:/home/hadoop/spark/bin/BIT_Spark //在當前路徑下可以看到輸出3個文件part-***,原因是RDD有3個分區,每個分區默認輸出一個文件,SUCCESS文件執行表示成功。 hadoop@master:~/spark/bin/BIT_Spark$ ls part-00000 part-00001 part-00002 _SUCCESS hadoop@master:~/spark/bin/BIT_Spark$ vim part-00000 //查看第一個分區文件的內容 1 2 3 4 5

8.countByKey

類似count方法,不同的是countByKey方法會根據相同的Key計算其對應的Value個數,返回的是map類型的結果。

方法源碼實現:

def countByKey(): Map[K, Long]= self.mapValues(_ => 1L).reduceByKey(_ + _).collect().toMap

【例3-32】countByKey方法應用樣例

val a = sc.parallelize(List((1, "bit"), (2, "xwc"), (2, "fjg"), (3, "wc"),(3, "wc"),(3, "wc")), 2) a.countByKey res3: scala.collection.Map[Int,Long] = Map(1 -> 1, 2 -> 2,3 -> 3)

這個例子先構造鍵值對變量a,然后使用countByKey方法對相同Key的Value進行統計,過程是先調用mapValue方法把Value映射為1,再reduceByKey到Key和其對應Value的個數。

9.aggregate

aggregate方法先將每個分區里面的元素進行聚合,然后用combine函數將每個分區的結果和初始值(zeroValue)進行combine操作。這個函數最終返回的類型不需要和RDD中元素類型一致。

aggregate有兩個函數seqOp和combOp,這兩個函數都是輸入兩個參數,輸出一個參數,其中seqOp函數可以看成是reduce操作,combOp函數可以看成是第二個reduce操作(一般用于combine各分區結果到一個總體結果),由定義,combOp操作的輸入和輸出類型必須一致。

方法源碼實現:

def aggregate[U:ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U = {//克隆zero值,因為這個值也會被序列化 var jobResult = Utils.clone(zeroValue,sc.env.closureSerializer.newInstance()) val cleanSeqOp = sc.clean(seqOp)val cleanCombOp = sc.clean(combOp)val aggregatePartition = (it: Iterator[T])=> it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp)val mergeResult = (index: Int, taskResult:U) => jobResult = combOp(jobResult, taskResult)sc.runJob(this, aggregatePartition,mergeResult)jobResult}

【例3-33】aggregate方法應用樣例

/ 分區0的reduce操作是max(0, 2,3) = 3 // 分區1的reduce操作是max(0, 4,5) = 5 // 分區2的reduce操作是max(0, 6,7) = 7 // 最后的combine操作是0 + 3 + 5 + 7 = 15 //注意最后的reduce操作包含初始值 (1)val z = sc.parallelize(List(2,3,4,5,6,7), 3) z.aggregate(0)((a,b) => math.max(a, b), (c,d) => c + d ) res6: Int = 15 // 分區0的reduce操作是max(3, 2,3) = 3 // 分區1的reduce操作是max(3, 4,5) = 5 // 分區2的reduce操作是max(3, 6,7) = 7 // 最后的combine操作是3 + 3 + 5 + 7 = 18 (2)val z = sc.parallelize(List(2,3,4,5,6,7), 3) z.aggregate(3)((a,b) => math.max(a, b), (c,d) => c + d ) res7: Int = 18 (3)val z = sc.parallelize(List("a","b","c","d","e","f"),2) z.aggregate("")(_ + _, _+_) res8: String = defabc (4)val z = sc.parallelize(List("a","b","c","d","e","f"),2) z.aggregate("x")(_ + _, _+_) res9: String = xxdefxabc

在spark中一個分區對應一個task,從源碼來看,zeroValue參與每個分區的seqOp(reduce)方法和最后的combOp(第二個reduce)方法,先對每個分區求reduce,在該例子中是對3個分區分別求Max操作,得到分區最大值,得到的結果參與combOp方法,即把各分區的結果和zeroValue相加最后得到結果值,從前兩個例子可以看出這個操作特點,體現先分后總的思想。

對于后面兩個例子使用的是字符串,aggregate方法的思路一樣,先對各分區求seqOp方法然后再使用combOp方法把各分區的結果聚合相加,得到最終結果。

10.fold

fold方法與aggregate方法原理類似,區別就是少了一個seqOp方法。fold方法是把每個分區的元素進行聚合,然后調用reduce(op)方法處理。

方法源碼實現:

def fold(zeroValue: T)(op: (T,T) => T): T = {//克隆zero值,因為這個值也會被序列化 var jobResult = Utils.clone(zeroValue,sc.env.closureSerializer.newInstance())val cleanOp = sc.clean(op)val foldPartition = (iter: Iterator[T])=> iter.fold(zeroValue)(cleanOp)val mergeResult = (index: Int, taskResult:T) => jobResult = op(jobResult, taskResult)sc.runJob(this, foldPartition, mergeResult)jobResult}

【例3-34】fold方法應用樣例

// 分區0的reduce操作是0 + 1 + 2 + 3 = 6 // 分區1的reduce操作是0 + 4 + 5 + 6 = 15 // 分區2的reduce操作是0 + 7 + 8 + 9 = 24 // 最后的combine操作是0 + 6 + 15 + 24 = 45 (1)val a = sc.parallelize(List(1,2,3,4,5,6,7,8,9), 3) a.fold(0)(_ + _) res11:: Int = 45 // 分區0的reduce操作是1 + 1 + 2 + 3 = 7 // 分區1的reduce操作是1 + 4 + 5 + 6 = 16 // 分區2的reduce操作是1 + 7 + 8 + 9 = 25 // 最后的combine操作是1 + 7 + 16 + 25 = 53 (2)val a = sc.parallelize(List(1,2,3,4,5,6,7,8,9), 3) a.fold(1)(_ + _) res12: Int = 53

?這個例子中的使用方式與aggregate方法非常相似,注意zeroValue參與所有分區計算。fold計算是保證每個分區能獨立計算,它與aggregate最大的區別是aggregate對不同分區提交的最終結果定義了一個專門的comOp函數來處理,而fold方法是采用一個方法來處理aggregate的兩個方法過程。


3.6 本章小結

? ??? ? 本章主要為讀者講述了Spark核心開發部分,其中講述了SparkContext的作用與創建過程,還對RDD的概念模型進行介紹,說明了RDD的Transformation和Action操作的內涵意義。在基本介紹Spark編程模型后在實踐環節列出了主要的Transformation和Action方法的使用范例,同時結合了方法源碼說明范例計算過程。本章為Spark應用基礎,第六章將繼續集合源碼深入介紹RDD的運行機制和Spark調度機制。下一章將逐一介紹Spark的四大編程模型,讓讀者進一步學習并掌握Spark在不同業務場景下的應用。

?

轉自http://blog.csdn.net/xwc35047/article/details/51146622

總結

以上是生活随笔為你收集整理的【核心API开发】Spark入门教程[3]的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。

无码乱肉视频免费大全合集 | 亚洲日本一区二区三区在线 | 少妇性俱乐部纵欲狂欢电影 | 精品偷拍一区二区三区在线看 | 久久99精品久久久久久动态图 | aⅴ亚洲 日韩 色 图网站 播放 | 国产av一区二区精品久久凹凸 | 国产va免费精品观看 | 夜精品a片一区二区三区无码白浆 | 狂野欧美性猛xxxx乱大交 | 无码人妻出轨黑人中文字幕 | 丰满肥臀大屁股熟妇激情视频 | 精品乱码久久久久久久 | 亚洲最大成人网站 | 西西人体www44rt大胆高清 | 波多野结衣高清一区二区三区 | 99久久精品无码一区二区毛片 | 亚洲精品久久久久中文第一幕 | 青春草在线视频免费观看 | 一本久道久久综合婷婷五月 | 小泽玛莉亚一区二区视频在线 | 波多野42部无码喷潮在线 | 美女扒开屁股让男人桶 | 国产艳妇av在线观看果冻传媒 | 无遮挡啪啪摇乳动态图 | 亚洲男人av香蕉爽爽爽爽 | a国产一区二区免费入口 | 精品人人妻人人澡人人爽人人 | 国产无遮挡吃胸膜奶免费看 | 人妻与老人中文字幕 | 无码av免费一区二区三区试看 | 亚洲无人区午夜福利码高清完整版 | 国产手机在线αⅴ片无码观看 | 日本熟妇乱子伦xxxx | 久久综合九色综合欧美狠狠 | 国产熟妇高潮叫床视频播放 | 精品久久8x国产免费观看 | 亚洲欧美日韩国产精品一区二区 | 亚洲国产精品一区二区第一页 | 乱码午夜-极国产极内射 | 欧美大屁股xxxxhd黑色 | 国产三级久久久精品麻豆三级 | 精品偷自拍另类在线观看 | 无码国产乱人伦偷精品视频 | 欧美 日韩 亚洲 在线 | 97久久精品无码一区二区 | 国产片av国语在线观看 | 国产人妖乱国产精品人妖 | 亚洲乱码中文字幕在线 | 午夜无码人妻av大片色欲 | 国产成人精品无码播放 | 无遮无挡爽爽免费视频 | 国产97在线 | 亚洲 | 亚洲精品中文字幕 | 久热国产vs视频在线观看 | 99久久亚洲精品无码毛片 | 国产在热线精品视频 | 成人性做爰aaa片免费看 | yw尤物av无码国产在线观看 | 99国产精品白浆在线观看免费 | 一本久道高清无码视频 | 少妇被粗大的猛进出69影院 | 狠狠cao日日穞夜夜穞av | 久久久久久久久蜜桃 | 美女极度色诱视频国产 | 97se亚洲精品一区 | 亚洲va中文字幕无码久久不卡 | 在线a亚洲视频播放在线观看 | 久久精品女人的天堂av | 亚洲人成网站在线播放942 | 99久久人妻精品免费一区 | 女人被爽到呻吟gif动态图视看 | 免费观看激色视频网站 | 无码人妻精品一区二区三区不卡 | 无码人妻黑人中文字幕 | 九九热爱视频精品 | 国产成人人人97超碰超爽8 | 国产在线aaa片一区二区99 | 亚洲码国产精品高潮在线 | 久久99国产综合精品 | 精品无码一区二区三区的天堂 | 高清不卡一区二区三区 | 天堂а√在线地址中文在线 | 男人扒开女人内裤强吻桶进去 | 国产精品久久久久无码av色戒 | 欧美日韩久久久精品a片 | 天堂亚洲2017在线观看 | 久久99久久99精品中文字幕 | 丰腴饱满的极品熟妇 | 永久免费精品精品永久-夜色 | 欧美国产亚洲日韩在线二区 | 内射欧美老妇wbb | 亚洲 a v无 码免 费 成 人 a v | 亚洲啪av永久无码精品放毛片 | 欧美老人巨大xxxx做受 | 午夜理论片yy44880影院 | 国产成人午夜福利在线播放 | 国产成人一区二区三区别 | 亚洲成av人片天堂网无码】 | 久久久久久av无码免费看大片 | 永久黄网站色视频免费直播 | 国产成人人人97超碰超爽8 | 欧美激情内射喷水高潮 | 欧美丰满熟妇xxxx性ppx人交 | 人人妻人人藻人人爽欧美一区 | 四十如虎的丰满熟妇啪啪 | 国产真实伦对白全集 | 国产午夜亚洲精品不卡 | 亚洲七七久久桃花影院 | 欧美日韩在线亚洲综合国产人 | 中文无码精品a∨在线观看不卡 | 欧美野外疯狂做受xxxx高潮 | 国产免费无码一区二区视频 | 国产精品第一国产精品 | 国产成人综合色在线观看网站 | 亚洲午夜久久久影院 | 日本一卡2卡3卡4卡无卡免费网站 国产一区二区三区影院 | 欧美激情综合亚洲一二区 | 国产舌乚八伦偷品w中 | 日本丰满熟妇videos | 日本一卡2卡3卡4卡无卡免费网站 国产一区二区三区影院 | 色一情一乱一伦一区二区三欧美 | 香港三级日本三级妇三级 | 东京热无码av男人的天堂 | 久久综合久久自在自线精品自 | 中文字幕精品av一区二区五区 | 亚洲男人av天堂午夜在 | 亚洲一区二区三区无码久久 | 久久久久成人片免费观看蜜芽 | 日韩人妻无码一区二区三区久久99 | 国产香蕉尹人视频在线 | 婷婷综合久久中文字幕蜜桃三电影 | 亚洲熟妇色xxxxx欧美老妇y | 亚洲国产精品久久久天堂 | 欧美亚洲国产一区二区三区 | 国产精品99久久精品爆乳 | 精品无码国产一区二区三区av | 久久人人爽人人爽人人片av高清 | 中文字幕+乱码+中文字幕一区 | 日韩av无码一区二区三区 | 青春草在线视频免费观看 | 永久免费观看国产裸体美女 | аⅴ资源天堂资源库在线 | 日韩少妇内射免费播放 | 99久久精品日本一区二区免费 | 蜜桃av蜜臀av色欲av麻 999久久久国产精品消防器材 | 亚洲第一网站男人都懂 | 亚洲国产欧美日韩精品一区二区三区 | a片免费视频在线观看 | av无码电影一区二区三区 | 国产av一区二区三区最新精品 | 老司机亚洲精品影院无码 | 精品无人区无码乱码毛片国产 | 精品国产一区av天美传媒 | 欧美丰满少妇xxxx性 | 亚洲综合在线一区二区三区 | 99riav国产精品视频 | 中文无码精品a∨在线观看不卡 | 亚洲精品中文字幕久久久久 | 国产乱人无码伦av在线a | 特级做a爰片毛片免费69 | 精品欧洲av无码一区二区三区 | 国产网红无码精品视频 | 99久久婷婷国产综合精品青草免费 | 在线精品国产一区二区三区 | 一本大道久久东京热无码av | 野外少妇愉情中文字幕 | 成人欧美一区二区三区 | 乱人伦人妻中文字幕无码 | 99精品视频在线观看免费 | 国产无av码在线观看 | 午夜精品久久久久久久久 | 亚洲精品一区三区三区在线观看 | 国产成人综合色在线观看网站 | 国产97在线 | 亚洲 | 99久久精品日本一区二区免费 | 欧美老妇与禽交 | 亚洲综合无码久久精品综合 | 丰满少妇高潮惨叫视频 | 狠狠噜狠狠狠狠丁香五月 | 4hu四虎永久在线观看 | 久久亚洲国产成人精品性色 | 无码国产激情在线观看 | 国产内射老熟女aaaa | 欧美阿v高清资源不卡在线播放 | www成人国产高清内射 | 久久久成人毛片无码 | 亚洲日韩中文字幕在线播放 | 国产亚洲日韩欧美另类第八页 | 欧美日韩一区二区综合 | 久久综合给合久久狠狠狠97色 | 国产电影无码午夜在线播放 | 欧美zoozzooz性欧美 | 中文字幕无码免费久久99 | 久久99精品国产.久久久久 | 亚洲另类伦春色综合小说 | 亚洲精品一区二区三区在线 | 国产在线无码精品电影网 | 国产极品美女高潮无套在线观看 | 国产电影无码午夜在线播放 | www国产亚洲精品久久网站 | 麻豆人妻少妇精品无码专区 | 日日天干夜夜狠狠爱 | 18无码粉嫩小泬无套在线观看 | 国产精品99久久精品爆乳 | 国内少妇偷人精品视频免费 | 欧美真人作爱免费视频 | 熟妇人妻无码xxx视频 | 中文字幕乱码中文乱码51精品 | 成人性做爰aaa片免费看 | 国产成人综合色在线观看网站 | 97夜夜澡人人双人人人喊 | 国产乱人伦av在线无码 | 中文字幕日产无线码一区 | 精品国产成人一区二区三区 | 狂野欧美性猛交免费视频 | 久久精品人人做人人综合试看 | 免费人成网站视频在线观看 | 99久久精品国产一区二区蜜芽 | 国产亚洲精品久久久久久大师 | 无码帝国www无码专区色综合 | 99国产精品白浆在线观看免费 | 亚洲中文字幕在线观看 | 黄网在线观看免费网站 | 欧美三级a做爰在线观看 | 亚洲日本一区二区三区在线 | 无码毛片视频一区二区本码 | 少妇久久久久久人妻无码 | 骚片av蜜桃精品一区 | 日本丰满护士爆乳xxxx | 一本精品99久久精品77 | 乱人伦人妻中文字幕无码 | 国产美女极度色诱视频www | 精品偷自拍另类在线观看 | 久久午夜无码鲁丝片秋霞 | 无套内谢老熟女 | 欧美性黑人极品hd | 双乳奶水饱满少妇呻吟 | 亚洲成色www久久网站 | 国产乱人伦app精品久久 国产在线无码精品电影网 国产国产精品人在线视 | 中文字幕 人妻熟女 | 国产亚洲精品久久久久久久 | 中文字幕精品av一区二区五区 | 中文字幕无码免费久久99 | 麻豆精产国品 | 日本精品人妻无码免费大全 | 欧美丰满少妇xxxx性 | 国内揄拍国内精品人妻 | av香港经典三级级 在线 | 老太婆性杂交欧美肥老太 | 日韩精品乱码av一区二区 | 人妻熟女一区 | 久久熟妇人妻午夜寂寞影院 | 伊在人天堂亚洲香蕉精品区 | 久久视频在线观看精品 | 国产在线一区二区三区四区五区 | 偷窥日本少妇撒尿chinese | 欧美黑人性暴力猛交喷水 | 久久久久av无码免费网 | 亚洲国产精品毛片av不卡在线 | 成人精品一区二区三区中文字幕 | 亚洲欧美日韩综合久久久 | 中文无码伦av中文字幕 | 国产电影无码午夜在线播放 | 乱人伦人妻中文字幕无码久久网 | 国产精品va在线观看无码 | 在教室伦流澡到高潮hnp视频 | 牛和人交xxxx欧美 | 沈阳熟女露脸对白视频 | 无码吃奶揉捏奶头高潮视频 | 亚洲男人av天堂午夜在 | 99麻豆久久久国产精品免费 | 67194成是人免费无码 | 亚洲中文字幕在线无码一区二区 | 成人精品天堂一区二区三区 | 亚洲狠狠色丁香婷婷综合 | 日韩少妇内射免费播放 | 丰满岳乱妇在线观看中字无码 | 中文字幕色婷婷在线视频 | 网友自拍区视频精品 | 成熟人妻av无码专区 | 国产精品久久久久无码av色戒 | 国产办公室秘书无码精品99 | 亚洲区小说区激情区图片区 | 老太婆性杂交欧美肥老太 | 国产在线精品一区二区高清不卡 | 99久久精品国产一区二区蜜芽 | 国产性猛交╳xxx乱大交 国产精品久久久久久无码 欧洲欧美人成视频在线 | 人人妻人人澡人人爽欧美一区 | 国产精品免费大片 | 国产精品-区区久久久狼 | 色一情一乱一伦一视频免费看 | 亚洲国产精品久久久久久 | 亚洲毛片av日韩av无码 | 亚洲国产欧美国产综合一区 | 亚洲成熟女人毛毛耸耸多 | 成人欧美一区二区三区 | 中文字幕人妻无码一夲道 | 国产乱人伦av在线无码 | 欧美成人免费全部网站 | 国产精品久免费的黄网站 | 亚洲乱码日产精品bd | 久久久久久a亚洲欧洲av冫 | 狠狠噜狠狠狠狠丁香五月 | 无码人妻少妇伦在线电影 | 丁香花在线影院观看在线播放 | 露脸叫床粗话东北少妇 | 日韩亚洲欧美精品综合 | 色婷婷欧美在线播放内射 | 亚洲自偷自偷在线制服 | 四虎国产精品免费久久 | 亚洲日韩一区二区三区 | 亚洲综合另类小说色区 | av小次郎收藏 | 日本xxxx色视频在线观看免费 | 曰韩少妇内射免费播放 | 麻豆国产97在线 | 欧洲 | 午夜免费福利小电影 | 又大又紧又粉嫩18p少妇 | 人妻少妇被猛烈进入中文字幕 | 亚洲午夜久久久影院 | 久青草影院在线观看国产 | 蜜臀av在线观看 在线欧美精品一区二区三区 | 精品亚洲韩国一区二区三区 | 欧美性猛交内射兽交老熟妇 | 亚洲爆乳精品无码一区二区三区 | 人妻有码中文字幕在线 | 国产亚洲人成a在线v网站 | 亚洲狠狠色丁香婷婷综合 | 日本在线高清不卡免费播放 | 国产欧美亚洲精品a | 天天做天天爱天天爽综合网 | 国产精品亚洲专区无码不卡 | 装睡被陌生人摸出水好爽 | 高潮毛片无遮挡高清免费视频 | 精品日本一区二区三区在线观看 | 人妻aⅴ无码一区二区三区 | 少妇人妻大乳在线视频 | 日本一区二区三区免费高清 | 午夜成人1000部免费视频 | 亚洲欧洲中文日韩av乱码 | 久久综合九色综合欧美狠狠 | 国产性生大片免费观看性 | 麻豆精品国产精华精华液好用吗 | 国产激情无码一区二区app | 日日摸天天摸爽爽狠狠97 | 76少妇精品导航 | 日本成熟视频免费视频 | 亚洲综合色区中文字幕 | 国产另类ts人妖一区二区 | 内射巨臀欧美在线视频 | 久久久亚洲欧洲日产国码αv | 国产内射老熟女aaaa | 漂亮人妻洗澡被公强 日日躁 | 日日噜噜噜噜夜夜爽亚洲精品 | 在线播放无码字幕亚洲 | 亚洲а∨天堂久久精品2021 | 丰满人妻翻云覆雨呻吟视频 | 欧美成人高清在线播放 | 中文字幕亚洲情99在线 | 国产成人精品视频ⅴa片软件竹菊 | 午夜无码人妻av大片色欲 | 国产激情无码一区二区 | 日韩精品a片一区二区三区妖精 | 成人动漫在线观看 | 精品夜夜澡人妻无码av蜜桃 | 国产莉萝无码av在线播放 | 亚洲第一无码av无码专区 | 亚洲色欲色欲欲www在线 | 青青青爽视频在线观看 | 亚洲精品国产精品乱码不卡 | 国产又爽又黄又刺激的视频 | 欧美成人家庭影院 | 少妇的肉体aa片免费 | 欧美大屁股xxxxhd黑色 | 一本久久a久久精品亚洲 | 中文字幕乱码中文乱码51精品 | 久久天天躁夜夜躁狠狠 | 在线成人www免费观看视频 | 亚洲日韩中文字幕在线播放 | 内射白嫩少妇超碰 | 欧美精品在线观看 | 久久综合九色综合97网 | 在线观看国产午夜福利片 | 熟妇人妻中文av无码 | 免费观看又污又黄的网站 | 免费播放一区二区三区 | 老熟妇仑乱视频一区二区 | 日韩精品一区二区av在线 | 老熟妇仑乱视频一区二区 | 久久亚洲日韩精品一区二区三区 | 亚洲熟妇自偷自拍另类 | 成人欧美一区二区三区黑人免费 | 大乳丰满人妻中文字幕日本 | 亚洲综合无码久久精品综合 | 日本乱人伦片中文三区 | 未满小14洗澡无码视频网站 | 国产xxx69麻豆国语对白 | 亚洲综合伊人久久大杳蕉 | 牲欲强的熟妇农村老妇女 | 丝袜 中出 制服 人妻 美腿 | 久久这里只有精品视频9 | 亚洲人成影院在线无码按摩店 | 国产人妻精品一区二区三区 | а√资源新版在线天堂 | 免费网站看v片在线18禁无码 | 人妻少妇精品久久 | 国产乱人无码伦av在线a | 精品日本一区二区三区在线观看 | 小鲜肉自慰网站xnxx | 亚洲人成影院在线观看 | 国产综合久久久久鬼色 | 亚洲日本va午夜在线电影 | 国产av久久久久精东av | 亚洲精品久久久久久一区二区 | 中文字幕乱码人妻无码久久 | 中文字幕无码人妻少妇免费 | 国产熟妇另类久久久久 | 国产激情精品一区二区三区 | 欧美成人午夜精品久久久 | 骚片av蜜桃精品一区 | 精品国产麻豆免费人成网站 | 成人三级无码视频在线观看 | 国产办公室秘书无码精品99 | 精品国精品国产自在久国产87 | 欧美xxxxx精品 | 特级做a爰片毛片免费69 | 欧美性猛交xxxx富婆 | 无码人妻黑人中文字幕 | 小鲜肉自慰网站xnxx | 久久精品国产日本波多野结衣 | 一本久久伊人热热精品中文字幕 | 香港三级日本三级妇三级 | 国产麻豆精品一区二区三区v视界 | 日本乱偷人妻中文字幕 | 国产三级精品三级男人的天堂 | 奇米影视7777久久精品 | 麻豆果冻传媒2021精品传媒一区下载 | 大肉大捧一进一出好爽视频 | 午夜嘿嘿嘿影院 | 午夜精品久久久久久久久 | 国产又粗又硬又大爽黄老大爷视 | 中文字幕无码人妻少妇免费 | 麻豆精品国产精华精华液好用吗 | 日产精品高潮呻吟av久久 | 色情久久久av熟女人妻网站 | 激情五月综合色婷婷一区二区 | 欧美亚洲日韩国产人成在线播放 | 欧美黑人性暴力猛交喷水 | 国产精品理论片在线观看 | 亚洲熟妇自偷自拍另类 | 国产综合久久久久鬼色 | 欧美一区二区三区 | 国产偷自视频区视频 | 亚洲精品午夜国产va久久成人 | 99久久人妻精品免费二区 | 桃花色综合影院 | 久久国产劲爆∧v内射 | 久久久久成人精品免费播放动漫 | 亚拍精品一区二区三区探花 | 欧美第一黄网免费网站 | av香港经典三级级 在线 | 熟妇人妻激情偷爽文 | 无码中文字幕色专区 | 国产精品人妻一区二区三区四 | 国产成人亚洲综合无码 | 人妻天天爽夜夜爽一区二区 | 性色欲情网站iwww九文堂 | 无码人妻出轨黑人中文字幕 | 亚洲色欲色欲天天天www | 国产av剧情md精品麻豆 | 沈阳熟女露脸对白视频 | 国产精品办公室沙发 | 东京一本一道一二三区 | 欧美国产日韩亚洲中文 | 国内精品九九久久久精品 | 中文字幕乱码人妻二区三区 | 国产精品亚洲专区无码不卡 | 色五月五月丁香亚洲综合网 | 国产亚洲精品久久久久久大师 | 日日橹狠狠爱欧美视频 | 天堂а√在线地址中文在线 | 国产成人精品三级麻豆 | 国产成人无码av片在线观看不卡 | 最近中文2019字幕第二页 | 久激情内射婷内射蜜桃人妖 | 综合激情五月综合激情五月激情1 | 精品偷拍一区二区三区在线看 | 欧美 丝袜 自拍 制服 另类 | 四虎影视成人永久免费观看视频 | 国产激情综合五月久久 | 男女猛烈xx00免费视频试看 | 97久久超碰中文字幕 | 99久久精品国产一区二区蜜芽 | 在线播放无码字幕亚洲 | 国产尤物精品视频 | 亚洲中文字幕av在天堂 | 精品偷拍一区二区三区在线看 | 午夜福利电影 | 国产97人人超碰caoprom | 女人被爽到呻吟gif动态图视看 | 天堂无码人妻精品一区二区三区 | 国产精品爱久久久久久久 | 丰满人妻翻云覆雨呻吟视频 | 伊人久久婷婷五月综合97色 | 内射后入在线观看一区 | √8天堂资源地址中文在线 | 国产av一区二区三区最新精品 | 国产精品.xx视频.xxtv | 小泽玛莉亚一区二区视频在线 | 少妇无码吹潮 | 一本久久a久久精品vr综合 | 精品国产麻豆免费人成网站 | 欧美日韩在线亚洲综合国产人 | 午夜性刺激在线视频免费 | 波多野结衣高清一区二区三区 | 成人精品一区二区三区中文字幕 | 丝袜 中出 制服 人妻 美腿 | 日日天干夜夜狠狠爱 | 亚洲一区二区三区国产精华液 | 成人女人看片免费视频放人 | 欧美人与物videos另类 | 噜噜噜亚洲色成人网站 | 久久97精品久久久久久久不卡 | 欧美人与善在线com | 麻豆人妻少妇精品无码专区 | 中文字幕无码热在线视频 | 欧美日韩人成综合在线播放 | √天堂中文官网8在线 | 久久精品国产日本波多野结衣 | 天堂久久天堂av色综合 | 51国偷自产一区二区三区 | 亚洲日本在线电影 | 亚洲国产欧美日韩精品一区二区三区 | 人人澡人人透人人爽 | 欧洲美熟女乱又伦 | 又色又爽又黄的美女裸体网站 | 亚洲一区二区三区 | 欧美真人作爱免费视频 | 中文字幕人妻无码一夲道 | 人妻少妇被猛烈进入中文字幕 | 丝袜足控一区二区三区 | 精品国产一区二区三区av 性色 | 高清不卡一区二区三区 | 国产猛烈高潮尖叫视频免费 | 国产精品人妻一区二区三区四 | 综合激情五月综合激情五月激情1 | 爽爽影院免费观看 | 久久亚洲国产成人精品性色 | 国精产品一品二品国精品69xx | 啦啦啦www在线观看免费视频 | 少妇无码av无码专区在线观看 | 色婷婷欧美在线播放内射 | 国产偷国产偷精品高清尤物 | 国产真实夫妇视频 | 婷婷五月综合缴情在线视频 | 久久综合九色综合欧美狠狠 | 天天做天天爱天天爽综合网 | 亚洲成色www久久网站 | 久久99精品国产麻豆 | 日韩人妻无码一区二区三区久久99 | 久久人妻内射无码一区三区 | 亚洲精品综合五月久久小说 | 又色又爽又黄的美女裸体网站 | 无码毛片视频一区二区本码 | 国产精品手机免费 | www国产精品内射老师 | 欧美激情综合亚洲一二区 | 国产在线精品一区二区三区直播 | 内射爽无广熟女亚洲 | 国产人妻精品午夜福利免费 | 扒开双腿疯狂进出爽爽爽视频 | 国产亚洲精品久久久ai换 | 少妇高潮一区二区三区99 | 国产精品人人妻人人爽 | 国产国语老龄妇女a片 | 天堂а√在线地址中文在线 | 狠狠色欧美亚洲狠狠色www | 国産精品久久久久久久 | 亚洲成a人片在线观看无码3d | 少妇愉情理伦片bd | 丝袜 中出 制服 人妻 美腿 | 55夜色66夜色国产精品视频 | 综合网日日天干夜夜久久 | aa片在线观看视频在线播放 | 国产乱子伦视频在线播放 | 亚洲中文字幕va福利 | 国产午夜亚洲精品不卡 | 国产免费无码一区二区视频 | 久久国产精品_国产精品 | 曰本女人与公拘交酡免费视频 | 内射老妇bbwx0c0ck | 又大又硬又爽免费视频 | 天堂久久天堂av色综合 | 人人澡人摸人人添 | 欧美变态另类xxxx | 在线精品亚洲一区二区 | 97久久国产亚洲精品超碰热 | 狂野欧美性猛xxxx乱大交 | 久久亚洲a片com人成 | 伊人久久婷婷五月综合97色 | 精品国产麻豆免费人成网站 | 色综合久久久无码中文字幕 | 亚洲爆乳精品无码一区二区三区 | 亚洲精品一区二区三区大桥未久 | 久久国产精品偷任你爽任你 | 麻豆成人精品国产免费 | 国产熟妇高潮叫床视频播放 | 国产精品久久久午夜夜伦鲁鲁 | 亚洲精品一区二区三区四区五区 | 熟妇人妻无乱码中文字幕 | 免费视频欧美无人区码 | 久久精品国产一区二区三区肥胖 | 少妇无套内谢久久久久 | 国产精品亚洲综合色区韩国 | 色综合久久久无码网中文 | 国产人成高清在线视频99最全资源 | 欧美性黑人极品hd | 国产精品久久久久久亚洲毛片 | 成熟人妻av无码专区 | 亚洲综合精品香蕉久久网 | 三上悠亚人妻中文字幕在线 | 久久精品人妻少妇一区二区三区 | 成熟女人特级毛片www免费 | 少妇人妻av毛片在线看 | 国产精品嫩草久久久久 | 中文字幕av日韩精品一区二区 | 一本加勒比波多野结衣 | 久久久久免费精品国产 | 中文字幕日产无线码一区 | 一区二区三区乱码在线 | 欧洲 | 亚洲色偷偷偷综合网 | 日本一卡二卡不卡视频查询 | 国产精品第一国产精品 | 精品成人av一区二区三区 | 中文字幕乱码人妻无码久久 | 丰满岳乱妇在线观看中字无码 | 亚洲 激情 小说 另类 欧美 | 亚洲爆乳精品无码一区二区三区 | 无码国产色欲xxxxx视频 | 永久免费观看国产裸体美女 | 日本熟妇乱子伦xxxx | 国产小呦泬泬99精品 | 亚洲国产精品久久久久久 | 天堂无码人妻精品一区二区三区 | 精品人妻中文字幕有码在线 | 国产午夜亚洲精品不卡 | 色五月五月丁香亚洲综合网 | www成人国产高清内射 | 国产成人av免费观看 | 国精产品一区二区三区 | av无码不卡在线观看免费 | aⅴ亚洲 日韩 色 图网站 播放 | 亚洲综合色区中文字幕 | 未满小14洗澡无码视频网站 | 欧美日韩一区二区免费视频 | 国产福利视频一区二区 | 色综合久久久无码网中文 | 日本高清一区免费中文视频 | 日本护士xxxxhd少妇 | 天海翼激烈高潮到腰振不止 | 无码人妻精品一区二区三区下载 | 九九热爱视频精品 | 奇米影视7777久久精品 | 国产精品人妻一区二区三区四 | 亚洲中文字幕久久无码 | 国产性生交xxxxx无码 | 欧美日本免费一区二区三区 | 精品无码国产一区二区三区av | aa片在线观看视频在线播放 | 久久久亚洲欧洲日产国码αv | 亚洲精品久久久久avwww潮水 | 国产福利视频一区二区 | 国产特级毛片aaaaaaa高清 | 波多野结衣高清一区二区三区 | 日韩亚洲欧美中文高清在线 | 日韩视频 中文字幕 视频一区 | 日韩精品一区二区av在线 | 日本欧美一区二区三区乱码 | 77777熟女视频在线观看 а天堂中文在线官网 | 六月丁香婷婷色狠狠久久 | 日本乱人伦片中文三区 | 午夜熟女插插xx免费视频 | 粉嫩少妇内射浓精videos | 一个人看的视频www在线 | 中文字幕乱码人妻二区三区 | 久青草影院在线观看国产 | 国内揄拍国内精品少妇国语 | 久久久久国色av免费观看性色 | 久久久久国色av免费观看性色 | 国产人妻久久精品二区三区老狼 | 全黄性性激高免费视频 | 欧美精品免费观看二区 | 男女下面进入的视频免费午夜 | 亚洲欧美中文字幕5发布 | 免费观看又污又黄的网站 | 国产手机在线αⅴ片无码观看 | 日本饥渴人妻欲求不满 | 午夜福利电影 | 在线播放免费人成毛片乱码 | 久久午夜无码鲁丝片午夜精品 | 亚洲中文字幕va福利 | 亚洲国产精品一区二区第一页 | 国产又粗又硬又大爽黄老大爷视 | 精品久久综合1区2区3区激情 | 国产精品福利视频导航 | 天天拍夜夜添久久精品 | 国产成人人人97超碰超爽8 | аⅴ资源天堂资源库在线 | 亚洲精品一区二区三区在线观看 | 荫蒂添的好舒服视频囗交 | 清纯唯美经典一区二区 | 日韩精品无码一本二本三本色 | 荫蒂被男人添的好舒服爽免费视频 | 99riav国产精品视频 | 伊人色综合久久天天小片 | 国产艳妇av在线观看果冻传媒 | 亚洲中文字幕在线无码一区二区 | 国产在线aaa片一区二区99 | 亚洲熟妇自偷自拍另类 | 国产激情综合五月久久 | 国产精品亚洲lv粉色 | 麻豆av传媒蜜桃天美传媒 | 狠狠色噜噜狠狠狠狠7777米奇 | av小次郎收藏 | 亚洲欧美综合区丁香五月小说 | 亚洲 a v无 码免 费 成 人 a v | av无码不卡在线观看免费 | 久热国产vs视频在线观看 | 日韩人妻少妇一区二区三区 | 久久久久久久久888 | 亚洲国产精品久久人人爱 | 300部国产真实乱 | 国产精品久久久av久久久 | 国产av一区二区精品久久凹凸 | 午夜福利试看120秒体验区 | 动漫av一区二区在线观看 | 牲欲强的熟妇农村老妇女视频 | 亚洲狠狠色丁香婷婷综合 | 未满成年国产在线观看 | 精品无人区无码乱码毛片国产 | 国产内射爽爽大片视频社区在线 | 一二三四在线观看免费视频 | 日本一卡2卡3卡4卡无卡免费网站 国产一区二区三区影院 | 国产绳艺sm调教室论坛 | 熟妇人妻无乱码中文字幕 | 300部国产真实乱 | 无码成人精品区在线观看 | 天堂一区人妻无码 | 精品国产一区二区三区四区 | 免费播放一区二区三区 | 色偷偷av老熟女 久久精品人妻少妇一区二区三区 | 人人妻在人人 | 国内少妇偷人精品视频免费 | 亚洲精品国偷拍自产在线观看蜜桃 | aⅴ亚洲 日韩 色 图网站 播放 | 久久久中文久久久无码 | 国产亚洲日韩欧美另类第八页 | 又粗又大又硬毛片免费看 | 樱花草在线播放免费中文 | 成人试看120秒体验区 | 日本www一道久久久免费榴莲 | 国产成人一区二区三区别 | 日本va欧美va欧美va精品 | 亚洲男人av天堂午夜在 | 精品久久久久久人妻无码中文字幕 | 狠狠色欧美亚洲狠狠色www | 中文字幕亚洲情99在线 | 人人爽人人爽人人片av亚洲 | 99精品国产综合久久久久五月天 | 99久久精品国产一区二区蜜芽 | 九月婷婷人人澡人人添人人爽 | 国产av剧情md精品麻豆 | 国产真实夫妇视频 | 丰满护士巨好爽好大乳 | 在线欧美精品一区二区三区 | 亚洲欧洲日本无在线码 | 久久午夜无码鲁丝片 | 亚洲日韩一区二区 | 一本无码人妻在中文字幕免费 | 丰满少妇人妻久久久久久 | 久久无码中文字幕免费影院蜜桃 | 精品国产成人一区二区三区 | 伊在人天堂亚洲香蕉精品区 | 国产内射爽爽大片视频社区在线 | 欧美 日韩 人妻 高清 中文 | 久久熟妇人妻午夜寂寞影院 | 精品夜夜澡人妻无码av蜜桃 | 精品欧美一区二区三区久久久 | 国产精品久久久久无码av色戒 | 亚洲熟悉妇女xxx妇女av | 少妇邻居内射在线 | 国产两女互慰高潮视频在线观看 | 中文字幕乱码亚洲无线三区 | 国语自产偷拍精品视频偷 | 国产绳艺sm调教室论坛 | 丁香花在线影院观看在线播放 | 无码纯肉视频在线观看 | 丰满人妻一区二区三区免费视频 | 大地资源网第二页免费观看 | 亚洲欧美精品aaaaaa片 | 日韩精品无码一本二本三本色 | 精品久久久久久亚洲精品 | 一本大道久久东京热无码av | 黑人粗大猛烈进出高潮视频 | 少妇性俱乐部纵欲狂欢电影 | 97久久超碰中文字幕 | 久久精品国产一区二区三区肥胖 | 麻豆国产97在线 | 欧洲 | 国产艳妇av在线观看果冻传媒 | 丰满人妻精品国产99aⅴ | 国产精品无码mv在线观看 | 红桃av一区二区三区在线无码av | 成 人影片 免费观看 | 国产成人精品视频ⅴa片软件竹菊 | 一个人看的视频www在线 | 国产suv精品一区二区五 | 亚洲精品国偷拍自产在线观看蜜桃 | 男人的天堂av网站 | 久久人人爽人人人人片 | 日日碰狠狠躁久久躁蜜桃 | 亚洲啪av永久无码精品放毛片 | 亚洲国产精品美女久久久久 | 精品厕所偷拍各类美女tp嘘嘘 | 大肉大捧一进一出视频出来呀 | 在线亚洲高清揄拍自拍一品区 | 伦伦影院午夜理论片 | 好男人社区资源 | 午夜福利不卡在线视频 | 中文精品无码中文字幕无码专区 | 国产免费久久精品国产传媒 | 久久精品人妻少妇一区二区三区 | 美女扒开屁股让男人桶 | 午夜无码人妻av大片色欲 | 帮老师解开蕾丝奶罩吸乳网站 | 又大又黄又粗又爽的免费视频 | 色五月丁香五月综合五月 | 国产美女精品一区二区三区 | 亚洲日韩av一区二区三区中文 | 久久精品人人做人人综合试看 | 国产成人一区二区三区在线观看 | 亚洲 高清 成人 动漫 | 丰满少妇女裸体bbw | 少妇性荡欲午夜性开放视频剧场 | 少妇被黑人到高潮喷出白浆 | 少妇无码吹潮 | 精品成在人线av无码免费看 | 日本精品少妇一区二区三区 | 久久99精品久久久久婷婷 | 亚洲精品国产品国语在线观看 | 乱人伦中文视频在线观看 | 国产午夜手机精彩视频 | 国产精品国产三级国产专播 | 东京一本一道一二三区 | 亚洲精品一区二区三区婷婷月 | 国产另类ts人妖一区二区 | 日韩欧美中文字幕在线三区 | 欧美精品无码一区二区三区 | 精品欧美一区二区三区久久久 | 18禁黄网站男男禁片免费观看 | 久久综合九色综合97网 | 国产成人无码区免费内射一片色欲 | 天堂久久天堂av色综合 | 激情内射亚州一区二区三区爱妻 | 精品人人妻人人澡人人爽人人 | 精品无码一区二区三区爱欲 | 国产午夜精品一区二区三区嫩草 | 久久精品国产99久久6动漫 | 波多野结衣av在线观看 | 99久久久无码国产精品免费 | 亚洲一区二区三区香蕉 | 国产成人无码a区在线观看视频app | 动漫av网站免费观看 | 国产精品无码一区二区三区不卡 | 精品久久久久久亚洲精品 | 国产美女极度色诱视频www | 帮老师解开蕾丝奶罩吸乳网站 | 亚洲国产午夜精品理论片 | 婷婷综合久久中文字幕蜜桃三电影 | 97人妻精品一区二区三区 | 男女猛烈xx00免费视频试看 | 色 综合 欧美 亚洲 国产 | 一本久道高清无码视频 | 呦交小u女精品视频 | 国产精品a成v人在线播放 | 中文字幕无码av激情不卡 | 久久综合给久久狠狠97色 | 久久综合给合久久狠狠狠97色 | 曰本女人与公拘交酡免费视频 | 亚洲乱码国产乱码精品精 | 天堂а√在线中文在线 | 成人女人看片免费视频放人 | 一个人看的视频www在线 | 亚洲精品成人av在线 | 暴力强奷在线播放无码 | 国产手机在线αⅴ片无码观看 | 中文字幕无码视频专区 | 国产香蕉97碰碰久久人人 | 女人和拘做爰正片视频 | 国产成人一区二区三区在线观看 | 天海翼激烈高潮到腰振不止 | 亚洲阿v天堂在线 | 熟女体下毛毛黑森林 | 欧美激情一区二区三区成人 | 思思久久99热只有频精品66 | 2020久久香蕉国产线看观看 | 精品少妇爆乳无码av无码专区 | 国产午夜手机精彩视频 | 无遮无挡爽爽免费视频 | √天堂资源地址中文在线 | 人人超人人超碰超国产 | 欧美黑人性暴力猛交喷水 | 国产成人无码av在线影院 | 成人精品视频一区二区三区尤物 | 亚洲成av人片天堂网无码】 | 狠狠综合久久久久综合网 | 亚洲熟悉妇女xxx妇女av | 特大黑人娇小亚洲女 | 欧美性色19p | 亚洲精品成人福利网站 | 久久精品女人天堂av免费观看 | 成人影院yy111111在线观看 | 欧美老人巨大xxxx做受 | 狠狠躁日日躁夜夜躁2020 | 中文字幕乱码人妻二区三区 | 国产网红无码精品视频 | 黑人玩弄人妻中文在线 | 鲁一鲁av2019在线 | 亚洲热妇无码av在线播放 | 久精品国产欧美亚洲色aⅴ大片 | 午夜性刺激在线视频免费 | 无码任你躁久久久久久久 | 亚洲 日韩 欧美 成人 在线观看 | 欧美丰满熟妇xxxx性ppx人交 | 色狠狠av一区二区三区 | 国产在线精品一区二区三区直播 | 四虎永久在线精品免费网址 | 亚欧洲精品在线视频免费观看 | 免费播放一区二区三区 | 欧洲精品码一区二区三区免费看 | 麻豆精品国产精华精华液好用吗 | 久久精品视频在线看15 | 中文字幕亚洲情99在线 | 国产精品人妻一区二区三区四 | 美女张开腿让人桶 | 国产av一区二区三区最新精品 | 国产精品久久国产精品99 | 国产乱人无码伦av在线a | 欧美 亚洲 国产 另类 | 久久精品无码一区二区三区 | 日日橹狠狠爱欧美视频 | 日本一区二区三区免费高清 | 精品无人区无码乱码毛片国产 | 人人澡人人妻人人爽人人蜜桃 | 欧美人与禽猛交狂配 | 欧美丰满老熟妇xxxxx性 | 老熟妇仑乱视频一区二区 | 牲欲强的熟妇农村老妇女视频 | 国产精品久免费的黄网站 | 天天躁夜夜躁狠狠是什么心态 | 亚洲啪av永久无码精品放毛片 | 久久久久成人精品免费播放动漫 | 无套内射视频囯产 | 欧美老人巨大xxxx做受 | 激情国产av做激情国产爱 | 国产精品久久久一区二区三区 | 特级做a爰片毛片免费69 | 国产午夜视频在线观看 | 久久久精品成人免费观看 | 精品无码国产自产拍在线观看蜜 | 中文字幕精品av一区二区五区 | 亚洲色成人中文字幕网站 | 色综合视频一区二区三区 | 中文精品久久久久人妻不卡 | 国产精品成人av在线观看 | 蜜桃臀无码内射一区二区三区 | 97色伦图片97综合影院 | 性色欲网站人妻丰满中文久久不卡 | 久久久久人妻一区精品色欧美 | 久久久久久亚洲精品a片成人 | 精品欧洲av无码一区二区三区 | 内射巨臀欧美在线视频 | 大肉大捧一进一出好爽视频 | 午夜福利一区二区三区在线观看 | 亚洲综合在线一区二区三区 | 领导边摸边吃奶边做爽在线观看 | 97无码免费人妻超级碰碰夜夜 | 国产福利视频一区二区 | www一区二区www免费 | 日本爽爽爽爽爽爽在线观看免 | 欧美丰满少妇xxxx性 | 成 人影片 免费观看 | 无码国模国产在线观看 | 久久精品人妻少妇一区二区三区 | 综合网日日天干夜夜久久 | 亚洲国产成人av在线观看 | 女人被男人躁得好爽免费视频 | 国产精品久久久 | 少妇性俱乐部纵欲狂欢电影 | 久久97精品久久久久久久不卡 | 欧美xxxxx精品 | 奇米影视7777久久精品人人爽 | 日韩 欧美 动漫 国产 制服 | 天堂а√在线地址中文在线 | 激情爆乳一区二区三区 | 国内丰满熟女出轨videos | 国产综合在线观看 | 又湿又紧又大又爽a视频国产 | 99re在线播放 | 伊人久久大香线蕉av一区二区 | 少妇无套内谢久久久久 | 黑森林福利视频导航 | 一区二区三区乱码在线 | 欧洲 | 一个人看的www免费视频在线观看 | 台湾无码一区二区 | 国产 精品 自在自线 | 精品一区二区不卡无码av | 麻豆国产人妻欲求不满谁演的 | 在线播放亚洲第一字幕 | 又黄又爽又色的视频 | 国产黄在线观看免费观看不卡 | 十八禁真人啪啪免费网站 | 无码av最新清无码专区吞精 | 国产在线精品一区二区高清不卡 | 中文字幕人妻无码一夲道 | 性欧美牲交xxxxx视频 | 亲嘴扒胸摸屁股激烈网站 | 自拍偷自拍亚洲精品被多人伦好爽 | 午夜不卡av免费 一本久久a久久精品vr综合 | 午夜精品久久久内射近拍高清 | 青青草原综合久久大伊人精品 | 亚洲小说图区综合在线 | 日本一本二本三区免费 | 天堂在线观看www | 国产做国产爱免费视频 | 99精品久久毛片a片 | 午夜丰满少妇性开放视频 | 日韩亚洲欧美精品综合 | 日本又色又爽又黄的a片18禁 | 欧美性生交xxxxx久久久 | 国产精品久久久久久亚洲毛片 | 日韩无套无码精品 | 亚洲成av人在线观看网址 | 永久免费精品精品永久-夜色 | 无码人妻丰满熟妇区五十路百度 | 性色欲网站人妻丰满中文久久不卡 | 亚洲一区二区三区国产精华液 | 51国偷自产一区二区三区 | 中文精品无码中文字幕无码专区 | 国产农村妇女高潮大叫 | 国产熟妇另类久久久久 | 中文无码伦av中文字幕 | 99国产欧美久久久精品 | 欧美怡红院免费全部视频 | 亚洲国产高清在线观看视频 | 成 人 免费观看网站 | 欧美一区二区三区视频在线观看 | 狂野欧美性猛交免费视频 | 久久精品人人做人人综合试看 | 国内精品一区二区三区不卡 | 蜜臀aⅴ国产精品久久久国产老师 | 清纯唯美经典一区二区 | 欧美阿v高清资源不卡在线播放 | 真人与拘做受免费视频 | 牲欲强的熟妇农村老妇女 | 乱码av麻豆丝袜熟女系列 | 黑森林福利视频导航 | 亚洲经典千人经典日产 | 亚洲综合无码久久精品综合 | 鲁鲁鲁爽爽爽在线视频观看 | 中文字幕乱妇无码av在线 | 国内揄拍国内精品少妇国语 | 久久www免费人成人片 | 日本爽爽爽爽爽爽在线观看免 | 图片区 小说区 区 亚洲五月 | 亚洲国产精品一区二区第一页 | 国产 精品 自在自线 | 欧洲vodafone精品性 | 国产成人无码一二三区视频 | 欧美日韩一区二区免费视频 | 高潮毛片无遮挡高清免费视频 | 丰满岳乱妇在线观看中字无码 | 色五月丁香五月综合五月 | a国产一区二区免费入口 | 在线欧美精品一区二区三区 | 兔费看少妇性l交大片免费 | 亚洲中文字幕在线观看 | 内射欧美老妇wbb | 国产激情无码一区二区app | 伊人久久大香线焦av综合影院 | 色情久久久av熟女人妻网站 | 国产精品无码成人午夜电影 | 免费网站看v片在线18禁无码 | yw尤物av无码国产在线观看 | 亚洲七七久久桃花影院 | 国产激情无码一区二区app | 日本大乳高潮视频在线观看 | 丰满人妻一区二区三区免费视频 | 久久精品国产大片免费观看 | 正在播放东北夫妻内射 | 人妻尝试又大又粗久久 | 日韩 欧美 动漫 国产 制服 | 欧美熟妇另类久久久久久不卡 | 福利一区二区三区视频在线观看 | 伊人色综合久久天天小片 | 国内精品一区二区三区不卡 | 久久久精品成人免费观看 | 狠狠色噜噜狠狠狠狠7777米奇 | 亚洲成av人综合在线观看 | 久激情内射婷内射蜜桃人妖 | 国产香蕉尹人视频在线 | 成人片黄网站色大片免费观看 | 精品久久综合1区2区3区激情 | 熟妇人妻无码xxx视频 | аⅴ资源天堂资源库在线 | 日本又色又爽又黄的a片18禁 | 亚洲自偷自偷在线制服 | 久久国语露脸国产精品电影 | 一个人看的www免费视频在线观看 | 国产国产精品人在线视 | 国产精品无码mv在线观看 | 人人妻人人澡人人爽欧美精品 | 国产精品无套呻吟在线 | 蜜桃视频韩日免费播放 | 中文字幕无码人妻少妇免费 | 久久久久久av无码免费看大片 | 国产国语老龄妇女a片 | 亚洲精品一区二区三区在线观看 | 大乳丰满人妻中文字幕日本 | 色欲人妻aaaaaaa无码 | 欧美日韩人成综合在线播放 | 六月丁香婷婷色狠狠久久 | 国产女主播喷水视频在线观看 | 午夜理论片yy44880影院 | 久久精品中文字幕大胸 | 性生交大片免费看l | 天天做天天爱天天爽综合网 | 岛国片人妻三上悠亚 | 亚洲精品午夜无码电影网 | 成人无码影片精品久久久 | 国产精品久久国产精品99 | 牲欲强的熟妇农村老妇女 | 台湾无码一区二区 | 久久久久久久人妻无码中文字幕爆 | 一区二区传媒有限公司 | 日韩精品乱码av一区二区 | 精品人妻人人做人人爽夜夜爽 | 国内精品久久毛片一区二区 | 网友自拍区视频精品 | √天堂中文官网8在线 | 天天拍夜夜添久久精品大 | 在线а√天堂中文官网 | 99riav国产精品视频 | 在线看片无码永久免费视频 | 乱人伦人妻中文字幕无码久久网 | 欧美xxxx黑人又粗又长 | 伊人久久大香线焦av综合影院 | 麻花豆传媒剧国产免费mv在线 | 久久午夜无码鲁丝片 | 精品久久综合1区2区3区激情 | 97无码免费人妻超级碰碰夜夜 | 午夜精品一区二区三区的区别 | 5858s亚洲色大成网站www | 亚洲一区二区三区含羞草 | 欧美成人免费全部网站 | 51国偷自产一区二区三区 | 四虎永久在线精品免费网址 | 久久久久久久久888 | 自拍偷自拍亚洲精品10p | 亚洲成熟女人毛毛耸耸多 | 国产成人一区二区三区在线观看 | 婷婷六月久久综合丁香 | 无码人妻av免费一区二区三区 | 玩弄人妻少妇500系列视频 | 老子影院午夜精品无码 | 狠狠亚洲超碰狼人久久 | 欧美xxxx黑人又粗又长 | 久久精品国产大片免费观看 | 午夜福利一区二区三区在线观看 | 国产九九九九九九九a片 | 色五月五月丁香亚洲综合网 | 亚洲人成影院在线观看 | 亚洲国产欧美在线成人 | 亚洲综合无码久久精品综合 | 人妻与老人中文字幕 | 国产成人一区二区三区别 | 成人无码视频在线观看网站 | 亚洲中文字幕在线无码一区二区 | 国产在线精品一区二区高清不卡 | 99久久婷婷国产综合精品青草免费 | 熟女俱乐部五十路六十路av | 精品久久久久久亚洲精品 | 精品国产福利一区二区 | 日本一区二区三区免费高清 | 国产精品手机免费 | 人人爽人人爽人人片av亚洲 | 少妇厨房愉情理9仑片视频 | 国产艳妇av在线观看果冻传媒 | 300部国产真实乱 | 日韩精品成人一区二区三区 | 中文字幕人妻无码一区二区三区 | 午夜精品久久久内射近拍高清 | 亚洲成av人影院在线观看 | 色一情一乱一伦一区二区三欧美 | 97精品国产97久久久久久免费 | 人妻中文无码久热丝袜 | 1000部啪啪未满十八勿入下载 | 少妇邻居内射在线 | 亚洲经典千人经典日产 | 精品人人妻人人澡人人爽人人 | 无码人妻少妇伦在线电影 | 国产农村妇女高潮大叫 | yw尤物av无码国产在线观看 | 免费人成网站视频在线观看 | 青青青爽视频在线观看 | 久久99久久99精品中文字幕 | 国产99久久精品一区二区 | 国产 精品 自在自线 | 一本大道伊人av久久综合 | 亚洲日韩av片在线观看 | 人妻中文无码久热丝袜 | 久久精品女人的天堂av | 在线视频网站www色 | 久久久久av无码免费网 | 大色综合色综合网站 | 久久五月精品中文字幕 | 国产精品办公室沙发 | 水蜜桃色314在线观看 | 国产精品无套呻吟在线 | 中文字幕无码日韩欧毛 | 久久人妻内射无码一区三区 | 精品偷拍一区二区三区在线看 | 欧美日韩一区二区三区自拍 | 国产高清av在线播放 | 人妻无码久久精品人妻 | 亚洲成a人片在线观看无码3d | 久久久久av无码免费网 | 国产av一区二区精品久久凹凸 | 成人片黄网站色大片免费观看 | 成在人线av无码免观看麻豆 | 国产香蕉97碰碰久久人人 | 久久精品无码一区二区三区 | 九九热爱视频精品 | 天天躁夜夜躁狠狠是什么心态 | 欧美成人免费全部网站 | 亚洲男女内射在线播放 | 日本一卡2卡3卡四卡精品网站 | 亚洲日韩精品欧美一区二区 | 亚洲aⅴ无码成人网站国产app | 一个人免费观看的www视频 | 影音先锋中文字幕无码 | 精品偷拍一区二区三区在线看 | 最近的中文字幕在线看视频 | 国产明星裸体无码xxxx视频 | 精品 日韩 国产 欧美 视频 | 黑人巨大精品欧美一区二区 | 国产精品理论片在线观看 | www成人国产高清内射 | 亚洲日韩一区二区 | 亚洲啪av永久无码精品放毛片 | 国产精品人妻一区二区三区四 | 西西人体www44rt大胆高清 | 小sao货水好多真紧h无码视频 | 亚洲国产av精品一区二区蜜芽 | 少妇人妻偷人精品无码视频 | 国产精品.xx视频.xxtv | 老头边吃奶边弄进去呻吟 | 精品国产乱码久久久久乱码 | 欧美日韩综合一区二区三区 | 狠狠色丁香久久婷婷综合五月 | 乱码av麻豆丝袜熟女系列 | 成人精品一区二区三区中文字幕 | 日本精品人妻无码免费大全 | 98国产精品综合一区二区三区 | 无码人妻久久一区二区三区不卡 | a在线观看免费网站大全 | 欧美激情内射喷水高潮 | 精品夜夜澡人妻无码av蜜桃 | 亚洲人成网站在线播放942 | 好屌草这里只有精品 | 欧美人与禽zoz0性伦交 | 天天做天天爱天天爽综合网 | 狠狠噜狠狠狠狠丁香五月 | 国产熟妇另类久久久久 | 亚洲色欲色欲欲www在线 | 国产无遮挡又黄又爽又色 | 美女张开腿让人桶 | 亚洲自偷自偷在线制服 | 日产精品高潮呻吟av久久 | 国产无遮挡又黄又爽免费视频 | 亚洲日韩av一区二区三区中文 | 国内精品九九久久久精品 | 少妇性俱乐部纵欲狂欢电影 | 99久久99久久免费精品蜜桃 | 色婷婷av一区二区三区之红樱桃 | 亚洲精品国产第一综合99久久 | 99久久人妻精品免费一区 | 欧美大屁股xxxxhd黑色 | 精品国精品国产自在久国产87 | 精品久久久久久人妻无码中文字幕 | 久久精品国产精品国产精品污 | 日韩少妇白浆无码系列 | 欧美日韩亚洲国产精品 | 免费播放一区二区三区 | 亚洲一区二区三区香蕉 | 内射巨臀欧美在线视频 | 日日天干夜夜狠狠爱 | 亚洲乱码国产乱码精品精 | 野外少妇愉情中文字幕 | 性色欲情网站iwww九文堂 | 131美女爱做视频 | 中文久久乱码一区二区 | 夫妻免费无码v看片 | 欧美精品无码一区二区三区 | 正在播放东北夫妻内射 | 日日天日日夜日日摸 | 麻豆精品国产精华精华液好用吗 | 亚洲男女内射在线播放 | 99久久婷婷国产综合精品青草免费 | 99精品无人区乱码1区2区3区 | 国产免费观看黄av片 | 欧美人与物videos另类 | 国产av人人夜夜澡人人爽麻豆 | 久久天天躁狠狠躁夜夜免费观看 | 国产精品二区一区二区aⅴ污介绍 | 国产午夜福利100集发布 | 免费国产成人高清在线观看网站 | 综合人妻久久一区二区精品 | 日本欧美一区二区三区乱码 | 免费视频欧美无人区码 | 久久久久久久久蜜桃 | 亚洲欧美日韩综合久久久 | 欧美高清在线精品一区 | 丰腴饱满的极品熟妇 | 色老头在线一区二区三区 | 国产高潮视频在线观看 | 成人精品天堂一区二区三区 | 伊人久久大香线焦av综合影院 | 精品国产一区二区三区av 性色 | 一本久道高清无码视频 | 久久久久久国产精品无码下载 | 蜜桃av抽搐高潮一区二区 | 1000部啪啪未满十八勿入下载 | 久久婷婷五月综合色国产香蕉 | 十八禁真人啪啪免费网站 | 男女作爱免费网站 | 人妻有码中文字幕在线 | 中文字幕无码人妻少妇免费 | 久久综合久久自在自线精品自 | 亚洲乱码中文字幕在线 | 国产精品高潮呻吟av久久 | 自拍偷自拍亚洲精品被多人伦好爽 | 国内精品一区二区三区不卡 | 成人欧美一区二区三区黑人 | 丝袜 中出 制服 人妻 美腿 | 伊人色综合久久天天小片 | 国产精品亚洲综合色区韩国 | 无码国内精品人妻少妇 | 久久久精品456亚洲影院 | 国精品人妻无码一区二区三区蜜柚 | 狠狠色噜噜狠狠狠狠7777米奇 | 亚洲s码欧洲m码国产av | av小次郎收藏 | 亚洲成熟女人毛毛耸耸多 | 亚洲一区二区三区国产精华液 | 亚洲精品无码人妻无码 | 一本久道久久综合狠狠爱 | 国产亚洲tv在线观看 | 久久婷婷五月综合色国产香蕉 | 久激情内射婷内射蜜桃人妖 | √天堂资源地址中文在线 | 又湿又紧又大又爽a视频国产 | 中文字幕 亚洲精品 第1页 | 亚洲精品鲁一鲁一区二区三区 | 中文毛片无遮挡高清免费 | 国产亚洲欧美在线专区 | 久久综合网欧美色妞网 | 精品久久久久久人妻无码中文字幕 | √天堂中文官网8在线 | 久久综合网欧美色妞网 | 激情国产av做激情国产爱 | 国产精品亚洲lv粉色 | 又大又硬又爽免费视频 | 久久久久久av无码免费看大片 | 亚洲精品国产品国语在线观看 | 精品一区二区不卡无码av | 国产亚洲精品久久久久久国模美 | 久久精品99久久香蕉国产色戒 | 午夜精品久久久内射近拍高清 | 成人影院yy111111在线观看 | 蜜臀av无码人妻精品 | 久久久久久久女国产乱让韩 | 欧美激情综合亚洲一二区 | 丰腴饱满的极品熟妇 | 亚洲区欧美区综合区自拍区 | 亚洲精品中文字幕乱码 | 国产精品国产三级国产专播 | 亚洲精品一区国产 | 国语精品一区二区三区 | 曰韩少妇内射免费播放 | 国内精品人妻无码久久久影院 | 国产av人人夜夜澡人人爽麻豆 | 中文字幕人妻无码一区二区三区 | 日韩在线不卡免费视频一区 | 精品偷自拍另类在线观看 | 亚洲国产成人av在线观看 | 国产av无码专区亚洲awww | 两性色午夜视频免费播放 | 在线a亚洲视频播放在线观看 | 午夜丰满少妇性开放视频 | 领导边摸边吃奶边做爽在线观看 | 欧美 丝袜 自拍 制服 另类 | 国产成人精品无码播放 | 内射老妇bbwx0c0ck | 日产精品99久久久久久 | 欧洲熟妇精品视频 | 青青久在线视频免费观看 | 久久亚洲a片com人成 | 久久99精品久久久久久 | 色综合天天综合狠狠爱 | 东京热一精品无码av | 精品国精品国产自在久国产87 | 三上悠亚人妻中文字幕在线 | 老熟妇仑乱视频一区二区 | 狠狠色噜噜狠狠狠7777奇米 | 无码播放一区二区三区 | 久久这里只有精品视频9 | 欧美日本日韩 | 性生交大片免费看女人按摩摩 | 欧洲极品少妇 | 精品一区二区三区波多野结衣 | 亚洲色在线无码国产精品不卡 | 欧美日韩综合一区二区三区 | 在线天堂新版最新版在线8 | 色妞www精品免费视频 | 少妇无套内谢久久久久 | 国产av一区二区三区最新精品 | 亚洲爆乳无码专区 | 日韩精品乱码av一区二区 | 撕开奶罩揉吮奶头视频 | 久久精品人人做人人综合 | 亚洲毛片av日韩av无码 | 色一情一乱一伦 | 99精品国产综合久久久久五月天 | 午夜无码人妻av大片色欲 | 在线观看国产午夜福利片 | 精品一区二区三区波多野结衣 | 亚洲狠狠色丁香婷婷综合 | 国产精品99爱免费视频 | 免费国产黄网站在线观看 | 国产精品毛多多水多 | 精品国产青草久久久久福利 | aⅴ亚洲 日韩 色 图网站 播放 | 欧美 丝袜 自拍 制服 另类 | 小鲜肉自慰网站xnxx | 日韩精品无码一本二本三本色 | 精品无码国产自产拍在线观看蜜 | 久久视频在线观看精品 | 18精品久久久无码午夜福利 | 午夜精品久久久久久久 | 国产精品久久久久无码av色戒 | 秋霞特色aa大片 | 国产亚洲美女精品久久久2020 | 国产成人无码av在线影院 | 国产精品免费大片 | 黑人巨大精品欧美一区二区 | 2020最新国产自产精品 | 女人高潮内射99精品 | 国产猛烈高潮尖叫视频免费 | 中文字幕日韩精品一区二区三区 | 蜜桃臀无码内射一区二区三区 | 国产美女极度色诱视频www | 在线观看国产午夜福利片 | 无码毛片视频一区二区本码 | 久久综合给合久久狠狠狠97色 | 人人妻人人澡人人爽欧美一区九九 | 成熟人妻av无码专区 | 人人澡人人透人人爽 | 成熟女人特级毛片www免费 | 初尝人妻少妇中文字幕 | 日欧一片内射va在线影院 | 国产莉萝无码av在线播放 | 18禁黄网站男男禁片免费观看 | 欧洲vodafone精品性 | 未满成年国产在线观看 | 午夜熟女插插xx免费视频 | 国产色视频一区二区三区 | 天堂在线观看www | 色五月丁香五月综合五月 | 亚洲欧美日韩成人高清在线一区 | 欧美国产日韩久久mv | 97久久超碰中文字幕 | 奇米影视7777久久精品 | 亚洲精品国产精品乱码视色 | 久久国产精品_国产精品 | 亚洲精品久久久久中文第一幕 | 无码人妻久久一区二区三区不卡 | 2020久久超碰国产精品最新 | 又黄又爽又色的视频 | 东京无码熟妇人妻av在线网址 | 97久久国产亚洲精品超碰热 | 久久午夜无码鲁丝片午夜精品 | 亚洲精品一区二区三区在线观看 | 九九在线中文字幕无码 | 天干天干啦夜天干天2017 | 天海翼激烈高潮到腰振不止 | 国产手机在线αⅴ片无码观看 | 亚洲 欧美 激情 小说 另类 | 国产精品沙发午睡系列 | 国产色xx群视频射精 | 水蜜桃色314在线观看 | 十八禁视频网站在线观看 | 丁香啪啪综合成人亚洲 | 中文无码伦av中文字幕 | 人妻天天爽夜夜爽一区二区 | 999久久久国产精品消防器材 | 国产欧美精品一区二区三区 | 激情内射亚州一区二区三区爱妻 | 自拍偷自拍亚洲精品被多人伦好爽 | 国色天香社区在线视频 | 无码人妻精品一区二区三区下载 | 亚洲精品欧美二区三区中文字幕 | 国产性猛交╳xxx乱大交 国产精品久久久久久无码 欧洲欧美人成视频在线 | 性啪啪chinese东北女人 | 国产午夜亚洲精品不卡下载 | 蜜桃无码一区二区三区 | 性史性农村dvd毛片 | 国产精品无码mv在线观看 | 天堂在线观看www | 亚洲精品午夜国产va久久成人 | 一个人看的www免费视频在线观看 | 亚洲精品一区二区三区四区五区 | 欧美日本免费一区二区三区 | 暴力强奷在线播放无码 | 精品人妻人人做人人爽夜夜爽 | 狂野欧美性猛交免费视频 | 好爽又高潮了毛片免费下载 | 理论片87福利理论电影 | 人妻少妇被猛烈进入中文字幕 | 老司机亚洲精品影院 | 在线天堂新版最新版在线8 | 综合激情五月综合激情五月激情1 | 少妇被黑人到高潮喷出白浆 | 色欲人妻aaaaaaa无码 | 久久久久亚洲精品中文字幕 | 精品欧美一区二区三区久久久 | 欧美精品一区二区精品久久 | 欧美性生交活xxxxxdddd | 国产乱码精品一品二品 | 亚拍精品一区二区三区探花 | 亚洲国产精品美女久久久久 | 无遮挡啪啪摇乳动态图 | 日韩在线不卡免费视频一区 | 国产97人人超碰caoprom | 国产农村妇女aaaaa视频 撕开奶罩揉吮奶头视频 | 欧美成人免费全部网站 | 国产午夜无码视频在线观看 | 牲交欧美兽交欧美 | 无遮挡啪啪摇乳动态图 | 亚洲熟悉妇女xxx妇女av | 国产亚洲精品久久久久久 | 亚洲 另类 在线 欧美 制服 | 女高中生第一次破苞av | 国产香蕉97碰碰久久人人 | 又黄又爽又色的视频 | 久青草影院在线观看国产 | 99久久久无码国产aaa精品 | 国产精品无码一区二区三区不卡 | 国产综合久久久久鬼色 | 极品尤物被啪到呻吟喷水 | 麻豆精品国产精华精华液好用吗 | 久久99精品久久久久久 | 国产成人久久精品流白浆 | 中文字幕亚洲情99在线 | 一个人看的www免费视频在线观看 | 精品国产一区av天美传媒 | 欧美亚洲日韩国产人成在线播放 | 国产手机在线αⅴ片无码观看 | 免费观看黄网站 | 久久国产精品偷任你爽任你 | 一本加勒比波多野结衣 | 玩弄中年熟妇正在播放 | 双乳奶水饱满少妇呻吟 | 亚洲а∨天堂久久精品2021 | 国产高清av在线播放 | 久久午夜无码鲁丝片午夜精品 | 日韩人妻无码一区二区三区久久99 | 伊人色综合久久天天小片 | 亚洲s色大片在线观看 | 蜜桃视频插满18在线观看 | 免费国产黄网站在线观看 | 中文字幕无码免费久久99 | 熟妇人妻中文av无码 | 女人和拘做爰正片视频 | 一个人看的视频www在线 | 精品国产青草久久久久福利 | 国产综合在线观看 | 国产亚洲精品久久久闺蜜 | 好男人www社区 | 亚洲乱码中文字幕在线 | 成人一在线视频日韩国产 | 99久久无码一区人妻 | 欧美日韩一区二区免费视频 | 欧美性黑人极品hd | 欧美怡红院免费全部视频 | 无码午夜成人1000部免费视频 | 午夜成人1000部免费视频 | 国产又粗又硬又大爽黄老大爷视 | 九九热爱视频精品 | 未满成年国产在线观看 | 亚洲熟妇自偷自拍另类 | 日日天干夜夜狠狠爱 | 亚洲国产av美女网站 | www成人国产高清内射 | www国产精品内射老师 | 亚洲人成影院在线观看 | 国产口爆吞精在线视频 | 丰满少妇人妻久久久久久 | 亚洲娇小与黑人巨大交 | 亚洲综合伊人久久大杳蕉 | 伦伦影院午夜理论片 | 又大又硬又爽免费视频 | 国内精品人妻无码久久久影院蜜桃 | 国产精品人妻一区二区三区四 | 国产三级久久久精品麻豆三级 | 国产成人无码区免费内射一片色欲 | 人妻少妇精品久久 | 国产无遮挡吃胸膜奶免费看 | 精品久久久久久人妻无码中文字幕 | 国内精品九九久久久精品 | 在教室伦流澡到高潮hnp视频 | 奇米影视888欧美在线观看 | 粉嫩少妇内射浓精videos | а√天堂www在线天堂小说 | 狠狠色色综合网站 | 成在人线av无码免费 | 99久久人妻精品免费一区 | 欧美35页视频在线观看 | 丰满少妇人妻久久久久久 | 国产 精品 自在自线 | 妺妺窝人体色www婷婷 | 蜜桃臀无码内射一区二区三区 | 国产在线无码精品电影网 | 欧美熟妇另类久久久久久不卡 | 中文字幕乱码亚洲无线三区 | 精品乱码久久久久久久 | 又大又紧又粉嫩18p少妇 | 欧美熟妇另类久久久久久多毛 | 乌克兰少妇性做爰 | 又大又硬又爽免费视频 | 亚洲爆乳无码专区 | 乱码午夜-极国产极内射 | 国产午夜亚洲精品不卡下载 | a国产一区二区免费入口 | 欧美人妻一区二区三区 | 国产免费久久久久久无码 | 福利一区二区三区视频在线观看 | 国产高潮视频在线观看 | 国产xxx69麻豆国语对白 | 国产亚洲精品久久久久久国模美 | 天天摸天天碰天天添 | 丰满少妇弄高潮了www | 国产一区二区三区精品视频 | 欧美刺激性大交 | 国产无遮挡吃胸膜奶免费看 | 久激情内射婷内射蜜桃人妖 | 亚洲国产精品一区二区美利坚 | 欧美大屁股xxxxhd黑色 | 未满小14洗澡无码视频网站 | 国内揄拍国内精品少妇国语 | 色 综合 欧美 亚洲 国产 | 99国产精品白浆在线观看免费 | 久久久久人妻一区精品色欧美 | 精品国产一区二区三区四区 | 国产精品二区一区二区aⅴ污介绍 | 18禁黄网站男男禁片免费观看 |