《Scala机器学习》一一3.3 应用
本節(jié)書摘來自華章計(jì)算機(jī)《Scala機(jī)器學(xué)習(xí)》一書中的第3章,第3.3節(jié),作者:[美] 亞歷克斯·科茲洛夫(Alex Kozlov),更多章節(jié)內(nèi)容可以訪問云棲社區(qū)“華章計(jì)算機(jī)”公眾號(hào)查看。
3.3 應(yīng)用
下面會(huì)介紹Spark/Scala中的一些實(shí)際示例和庫,具體會(huì)從一個(gè)非常經(jīng)典的單詞計(jì)數(shù)問題開始。
3.3.1 單詞計(jì)數(shù)
大多數(shù)現(xiàn)代機(jī)器學(xué)習(xí)算法需要多次傳遞數(shù)據(jù)。如果數(shù)據(jù)能存放在單臺(tái)機(jī)器的內(nèi)存中,則該數(shù)據(jù)會(huì)容易獲得,并且不會(huì)呈現(xiàn)性能瓶頸。如果數(shù)據(jù)太大,單臺(tái)機(jī)器的內(nèi)存容納不下,則可保存在磁盤(或數(shù)據(jù)庫)上,這樣雖然可得到更大的存儲(chǔ)空間,但存取速度大約會(huì)降為原來的1/100。另外還有一種方式就是分割數(shù)據(jù)集,將其存儲(chǔ)在網(wǎng)絡(luò)中的多臺(tái)機(jī)器上,并通過網(wǎng)絡(luò)來傳輸結(jié)果。雖然對(duì)這種方式仍有爭(zhēng)議,但分析表明,對(duì)于大多數(shù)實(shí)際系統(tǒng)而言,如果能有效地在多個(gè)CPU之間拆分工作負(fù)載,則通過一組網(wǎng)絡(luò)連接節(jié)點(diǎn)存儲(chǔ)數(shù)據(jù)比從單個(gè)節(jié)點(diǎn)上的硬盤重復(fù)存儲(chǔ)和讀取數(shù)據(jù)略有優(yōu)勢(shì)。
磁盤的平均帶寬約為100 MB/s,由于磁盤的轉(zhuǎn)速和緩存不同,其傳輸時(shí)會(huì)有幾毫秒的延遲。相對(duì)于直接從內(nèi)存中讀取數(shù)據(jù),速度要降為原來的1/100左右,當(dāng)然,這也會(huì)取決于數(shù)據(jù)大小和緩存的實(shí)現(xiàn)。現(xiàn)代數(shù)據(jù)總線可以超過10 GB/s的速度傳輸數(shù)據(jù)。而網(wǎng)絡(luò)速度仍然落后于直接的內(nèi)存訪問,特別是標(biāo)準(zhǔn)網(wǎng)絡(luò)層中TCP/IP內(nèi)核的開銷會(huì)對(duì)網(wǎng)絡(luò)速度影響很大。但專用硬件可以達(dá)到每秒幾十吉字節(jié),如果并行運(yùn)行,則可能和從內(nèi)存讀取一樣快。當(dāng)前的網(wǎng)絡(luò)傳輸速度介于1~10 GB/s之間,但在實(shí)際應(yīng)用中仍然比磁盤更快。因此,可以將數(shù)據(jù)分配到集群節(jié)點(diǎn)中所有機(jī)器的內(nèi)存中,并在集群上執(zhí)行迭代機(jī)器學(xué)習(xí)算法。
但內(nèi)存也有一個(gè)問題:在節(jié)點(diǎn)出現(xiàn)故障并重新啟動(dòng)后,內(nèi)存中的數(shù)據(jù)不會(huì)跨節(jié)點(diǎn)持久保存。一個(gè)流行的大數(shù)據(jù)框架Hadoop解決了這個(gè)問題。Hadoop受益于Dean/Ghemawat的論文(Jeff Dean和Sanjay Ghemawat, MapReduce: Simplified Data Processing on Large Clusters, OSDI, 2004.),這篇文章提出使用磁盤層持久性來保證容錯(cuò)和存儲(chǔ)中間結(jié)果。Hadoop MapReduce程序首先會(huì)在數(shù)據(jù)集的每一行上運(yùn)行map函數(shù),得到一個(gè)或多個(gè)鍵/值對(duì)。然后按鍵值對(duì)這些鍵/值對(duì)進(jìn)行排序、分組和聚合,使得具有相同鍵的記錄最終會(huì)在同一個(gè)reducer上處理,該reducer可能在一個(gè)(或多個(gè))節(jié)點(diǎn)上運(yùn)行。reducer會(huì)使用一個(gè)reduce函數(shù),遍歷同一個(gè)鍵對(duì)應(yīng)的所有值,并將它們聚合在一起。如果reducer因?yàn)橐恍┰蚴?#xff0c;由于其中間結(jié)果持久保存,則可以丟棄部分計(jì)算,然后可從檢查點(diǎn)保存的結(jié)果重新開始reduce計(jì)算。很多簡(jiǎn)單的類ETL應(yīng)用程序僅在保留非常少的狀態(tài)信息的情況下才遍歷數(shù)據(jù)集,這些狀態(tài)信息是從一個(gè)記錄到另一個(gè)記錄的。
單詞計(jì)數(shù)是MapReduce的經(jīng)典應(yīng)用程序。該程序可統(tǒng)計(jì)文檔中每個(gè)單詞的出現(xiàn)次數(shù)。在Scala中,對(duì)排好序的單詞列表采用foldLeft方法,很容易得到單詞計(jì)數(shù)。
如果運(yùn)行這個(gè)程序,會(huì)輸出(字,計(jì)數(shù))這樣的元組列表。該程序會(huì)按行來分詞,并對(duì)得到的單詞排序,然后將每個(gè)單詞與(字,計(jì)數(shù))元組列表中的最新條目(entry)進(jìn)行匹配。同樣的計(jì)算在MapReduce中會(huì)表示成如下形式:
首先需要按行處理文本,將行拆分成單詞,并生成(word,1)對(duì)。這個(gè)任務(wù)很容易并行化。為了并行化全局計(jì)數(shù),需對(duì)計(jì)數(shù)部分進(jìn)行劃分,具體的分解通過對(duì)單詞子集分配計(jì)數(shù)任務(wù)來實(shí)現(xiàn)。在Hadoop中需計(jì)算單詞的哈希值,并根據(jù)哈希值來劃分工作。
一旦map任務(wù)找到給定哈希的所有條目,它就可以將鍵/值對(duì)發(fā)送到reducer,在MapReduce中,發(fā)送部分通常稱為shuffle。從所有mapper中接收完所有的鍵/值對(duì)后,reducer才會(huì)組合這些值(如果可能,在mapper中也可部分組合這些值),并對(duì)整個(gè)聚合進(jìn)行計(jì)算,在這種情況下只進(jìn)行求和。單個(gè)reducer將查看給定單詞的所有值。
下面介紹Spark中單詞計(jì)數(shù)程序的日志輸出(Spark在默認(rèn)情況下輸出的日志會(huì)非常冗長,為了輸出關(guān)鍵的日志信息,可將conf /log4j.properties文件中的INFO替換為ERROR或FATAL):
這個(gè)過程發(fā)生的唯一的事情是元數(shù)據(jù)操作,Spark不會(huì)觸及數(shù)據(jù)本身,它會(huì)估計(jì)數(shù)據(jù)集的大小和分區(qū)數(shù)。默認(rèn)情況下是HDFS塊數(shù),但是可使用minPartitions參數(shù)明確指定最小分區(qū)數(shù):
下面定義另一個(gè)RDD,它源于linesRdd:
在2 GB的文本數(shù)據(jù)(共有40 291行,353 087個(gè)單詞)上執(zhí)行單詞計(jì)算程序時(shí),進(jìn)行讀取、分詞和按詞分組所花的時(shí)間不到1秒。通過擴(kuò)展日志記錄可看到以下內(nèi)容:
Spark打開幾個(gè)端口與執(zhí)行器和用戶通信
Spark UI運(yùn)行的端口為4040(可通過http://localhost: 4040打開)
可從本地或分布式存儲(chǔ)(HDFS、Cassandra和S3)中讀取文件
如果Spark構(gòu)建時(shí)支持Hive,它會(huì)連接到Hive上
Spark使用惰性求值(僅當(dāng)輸出請(qǐng)求時(shí))來執(zhí)行管道
Spark使用內(nèi)部調(diào)度器將作業(yè)拆分為任務(wù),優(yōu)化執(zhí)行任務(wù),然后執(zhí)行它們
結(jié)果存儲(chǔ)在RDD中,可用集合方法來保存或?qū)氲綀?zhí)行shell的節(jié)點(diǎn)的RAM中
并行性能調(diào)整的原則是在不同節(jié)點(diǎn)或線程之間分割工作負(fù)載,使得開銷相對(duì)較小,而且要保持負(fù)載平衡。
3.3.2 基于流的單詞計(jì)數(shù)
Spark支持對(duì)輸入流進(jìn)行監(jiān)聽,能對(duì)其進(jìn)行分區(qū),并以接近實(shí)時(shí)的方式來計(jì)算聚合。目前支持來自Kafka、Flume、HDFS/S3、Kinesis、Twitter,以及傳統(tǒng)的MQ(如ZeroMQ和MQTT)的數(shù)據(jù)流。在Spark中,流的傳輸是以小批量(micro-batch)方式進(jìn)行的。在Spark內(nèi)部會(huì)將輸入數(shù)據(jù)分成小批量,通常按大小的不同,有些所花的時(shí)間不到1秒,有些卻要幾分鐘,然后會(huì)對(duì)這些小批量數(shù)據(jù)執(zhí)行RDD聚合操作。
下面擴(kuò)展前面介紹的Flume示例。這需要修改Flume配置文件來創(chuàng)建一個(gè)Spark輪詢槽(polling sink),用這種槽來替代HDFS:
現(xiàn)在不用寫入HDFS,Flume將會(huì)等待Spark的輪詢數(shù)據(jù):
為了運(yùn)行程序,在一個(gè)窗口中啟動(dòng)Flume代理:
然后在另一個(gè)窗口運(yùn)行FlumeWordCount對(duì)象:
現(xiàn)在任何輸入到netcat連接的文本都將被分詞并在6秒的滑動(dòng)窗口上按每2秒計(jì)算單詞的量:
Spark/Scala允許在不同的流之間無縫切換。例如,Kafka發(fā)布/訂閱主題模型類似于如下形式:
要啟動(dòng)Kafka代理,首先下載最新發(fā)布的二進(jìn)制包并啟動(dòng)ZooKeeper。ZooKeeper是一個(gè)分布式服務(wù)協(xié)調(diào)器,即使Kafka部署在單節(jié)點(diǎn)上也需要它:
在另一個(gè)窗口中啟動(dòng)Kafka服務(wù)器:
運(yùn)行KafkaWordCount對(duì)象:
現(xiàn)在將單詞流發(fā)布到Kafka主題中,這需要再開啟一個(gè)計(jì)數(shù)窗口:
從上面的結(jié)果可以看出程序每兩秒輸出一次。Spark流有時(shí)被稱為小批次處理(micro-batch processing)。數(shù)據(jù)流有許多其他應(yīng)用程序(和框架),但要完全討論清楚會(huì)涉及很多內(nèi)容,因此需要單獨(dú)進(jìn)行介紹。在第5章會(huì)討論一些數(shù)據(jù)流上的機(jī)器學(xué)習(xí)問題。下面將介紹更傳統(tǒng)的類SQL接口。
3.3.3 Spark SQL和數(shù)據(jù)框
數(shù)據(jù)框(Data Frame)相對(duì)較新,在Spark的1.3版本中才引入,它允許人們使用標(biāo)準(zhǔn)的SQL語言來分析數(shù)據(jù)。在第1章就使用了一些SQL命令來進(jìn)行數(shù)據(jù)分析。SQL對(duì)于簡(jiǎn)單的數(shù)據(jù)分析和聚合非常有用。
最新的調(diào)查結(jié)果表明大約有70%的Spark用戶使用DataFrame。雖然DataFrame最近成為表格數(shù)據(jù)最流行的工作框架,但它是一個(gè)相對(duì)重量級(jí)的對(duì)象。DataFrame使用的管道在執(zhí)行速度上可能比基于Scala的vector或LabeledPoint(這兩個(gè)對(duì)象將在下一章討論)的速度慢得多。來自多名開發(fā)人員的證據(jù)表明:響應(yīng)時(shí)間可為幾十或幾百毫秒,這與具體查詢有關(guān),若是更簡(jiǎn)單的對(duì)象會(huì)小于1毫秒。
Spark為SQL實(shí)現(xiàn)了自己的shell,這是除標(biāo)準(zhǔn)Scala REPL shell以外的另一個(gè)shell。可通過./bin/spark-sql來運(yùn)行該shell,還可通過這種shell來訪問Hive/Impala或關(guān)系數(shù)據(jù)庫表:
在標(biāo)準(zhǔn)Spark的REPL中,可以通過運(yùn)行相同的查詢來執(zhí)行以下命令:
總結(jié)
以上是生活随笔為你收集整理的《Scala机器学习》一一3.3 应用的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: BZOJ 1257: [CQOI2007
- 下一篇: 运营商市场定位决定移动互联网的成败