1. spark ML概述
ML組件
ML的標(biāo)準(zhǔn)API使用管道(pipeline)這樣的方式,可以將多個(gè)算法或者數(shù)據(jù)處理過程整合到一個(gè)管道或者一個(gè)流程里運(yùn)行,其中包含下面幾個(gè)部分:
1. dataFrame:用于ML的dataset,保存數(shù)據(jù)
2. transformer:將一個(gè)dataFrame按照某種計(jì)算轉(zhuǎn)換成另外一個(gè)dataFrame,例如把一個(gè)包含特征的dataFrame通過模型預(yù)測(cè),生成一個(gè)包含特征和預(yù)測(cè)的dataFrame
3. estimator:根據(jù)訓(xùn)練樣本進(jìn)行模型訓(xùn)練(fit),并且得到一個(gè)對(duì)應(yīng)的transformer
4. pipeline:將多個(gè)transformer和estimator串成一個(gè)ML的工作流
5. parameter:transformer和estimator共用一套API來確定參數(shù)
spark的機(jī)器學(xué)習(xí)包有兩個(gè),一個(gè)是ML,一個(gè)是MLlib,前者是基于dataFrame的API實(shí)現(xiàn)的,后者是基于RDD的API實(shí)現(xiàn)的,官網(wǎng)推薦用前者,使用比較方便。
transformer
一個(gè)transformer包含特征轉(zhuǎn)換和已學(xué)習(xí)得到的數(shù)據(jù)模型,它實(shí)現(xiàn)了一個(gè)方法transform()
例如:一個(gè)特征transformer可能將一個(gè)dataFrame的某些列映射成新的列,然后輸出處理后的新的dataFrame;一個(gè)學(xué)習(xí)得到的模型將讀取一個(gè)包含特征的dataFrame,對(duì)每個(gè)樣本進(jìn)行預(yù)測(cè),并且把預(yù)測(cè)結(jié)果附加到這個(gè)dataFrame,得到一個(gè)新的dataFrame
Estimators
主要用于訓(xùn)練模型,實(shí)現(xiàn)了一個(gè)方法fit(),接受一個(gè)包含特征的dataFrame,然后訓(xùn)練得到一個(gè)模型,那個(gè)模型就是一個(gè)transformer
例如:一個(gè)LogisticRegression是一個(gè)estimator,然后通過調(diào)用fit(),得到一個(gè)LogisticRegressionModel,這是一個(gè)transformer。
每個(gè)transformer和estimator都有一個(gè)唯一ID,用于保存對(duì)應(yīng)的參數(shù)
pipeline
例如一個(gè)文本挖掘包含以下三個(gè)步驟:
1. 將文本切分成詞
2. 將詞轉(zhuǎn)換成特征向量
3. 訓(xùn)練得到一個(gè)模型,然后用于預(yù)測(cè)
spark ML將這樣一個(gè)工作流定義為pipeline,一個(gè)pipeline包含多個(gè)PipelineStages (transformer和estimator),通過dataFrame在各個(gè)stage中進(jìn)行傳遞。
這是一個(gè)訓(xùn)練模型的例子,包含了三個(gè)步驟,藍(lán)色的是指transformer,紅色是estimator
這是一個(gè)使用已訓(xùn)練模型預(yù)測(cè)樣本的例子,
Parameters
一個(gè)Paramap包含多個(gè)(parameter, value)的鍵值對(duì)
有兩種方法將參數(shù)傳給算法:
1. 將參數(shù)設(shè)置到算法的一個(gè)實(shí)例,例如lr是LogisticRegression的一個(gè)實(shí)例,則他可以調(diào)用lr.setMaxIter(10)來設(shè)置訓(xùn)練循環(huán)次數(shù)
2. 將paramap作為輸入?yún)?shù),給fit()或者transform(),這些參數(shù)會(huì)都會(huì)覆蓋掉原來set的值
我們可以將paramap傳給不同實(shí)例,例如lr1和lr2是LogisticRegression的兩個(gè)實(shí)例,我們可以建立ParamMap(lr1.maxIter -> 10, lr2.maxIter -> 20)的參數(shù)列表,即將兩個(gè)實(shí)例的參數(shù)都放在paramMap中
spark1.6的版本可以使用import/export導(dǎo)出模型或者pipeline到磁盤上
范例1
import org.apache.spark.ml.classification.LogisticRegression import org.apache.spark.ml.param.ParamMap import org.apache.spark.mllib.linalg.{Vector, Vectors} import org.apache.spark.sql.Rowval training = sqlContext.createDataFrame(Seq((1.0, Vectors.dense(0.0, 1.1, 0.1)),(0.0, Vectors.dense(2.0, 1.0, -1.0)),(0.0, Vectors.dense(2.0, 1.3, 1.0)),(1.0, Vectors.dense(0.0, 1.2, -0.5)) )).toDF("label", "features")//創(chuàng)建一個(gè)LogisticRegression實(shí)例,這是一個(gè)Estimator. val lr = new LogisticRegression() //打印參數(shù) println("LogisticRegression parameters:\n" + lr.explainParams() + "\n")//調(diào)用實(shí)例的set方法設(shè)置參數(shù) lr.setMaxIter(10).setRegParam(0.01)// 學(xué)習(xí)LogisticRegression模型,model1是一個(gè)transformer val model1 = lr.fit(training)println("Model 1 was fit using parameters: " + model1.parent.extractParamMap)// 通過paramap來設(shè)置參數(shù) val paramMap = ParamMap(lr.maxIter -> 20).put(lr.maxIter, 30) .put(lr.regParam -> 0.1, lr.threshold -> 0.55)// 兩個(gè)ParamMap之間可以相加合并. val paramMap2 = ParamMap(lr.probabilityCol -> "myProbability") // Change output column name val paramMapCombined = paramMap ++ paramMap2val model2 = lr.fit(training, paramMapCombined) println("Model 2 was fit using parameters: " + model2.parent.extractParamMap)//測(cè)試數(shù)據(jù) val test = sqlContext.createDataFrame(Seq((1.0, Vectors.dense(-1.0, 1.5, 1.3)),(0.0, Vectors.dense(3.0, 2.0, -0.1)),(1.0, Vectors.dense(0.0, 2.2, -1.5)) )).toDF("label", "features")//model2的transform()會(huì)只選擇features的數(shù)據(jù),不會(huì)把label數(shù)據(jù)包含進(jìn)去 model2.transform(test).select("features", "label", "myProbability", "prediction").collect().foreach { case Row(features: Vector, label: Double, prob: Vector, prediction: Double) =>println(s"($features, $label) -> prob=$prob, prediction=$prediction")} import org.apache.spark.ml.{Pipeline, PipelineModel} import org.apache.spark.ml.classification.LogisticRegression import org.apache.spark.ml.feature.{HashingTF, Tokenizer} import org.apache.spark.mllib.linalg.Vector import org.apache.spark.sql.Rowval training = sqlContext.createDataFrame(Seq((0L, "a b c d e spark", 1.0),(1L, "b d", 0.0),(2L, "spark f g h", 1.0),(3L, "hadoop mapreduce", 0.0) )).toDF("id", "text", "label")/* 初始化一個(gè)pipeline,包含三個(gè)步驟: tokenizer, hashingTF, and lr. tokenizer 負(fù)責(zé)切詞,hashingTF負(fù)責(zé)按詞進(jìn)行特征排列,lr負(fù)責(zé)模型訓(xùn)練 */ val tokenizer = new Tokenizer().setInputCol("text").setOutputCol("words") val hashingTF = new HashingTF().setNumFeatures(1000).setInputCol(tokenizer.getOutputCol).setOutputCol("features") val lr = new LogisticRegression().setMaxIter(10).setRegParam(0.01) val pipeline = new Pipeline().setStages(Array(tokenizer, hashingTF, lr))val model = pipeline.fit(training)// 將訓(xùn)練后得到的模型保存到磁盤 model.save("/tmp/spark-logistic-regression-model")// 把未訓(xùn)練的pipeline保存到磁盤 pipeline.save("/tmp/unfit-lr-model")// 從磁盤讀取模型 val sameModel = PipelineModel.load("/tmp/spark-logistic-regression-model")// 測(cè)試數(shù)據(jù) val test = sqlContext.createDataFrame(Seq((4L, "spark i j k"),(5L, "l m n"),(6L, "mapreduce spark"),(7L, "apache hadoop") )).toDF("id", "text")model.transform(test).select("id", "text", "probability", "prediction").collect().foreach { case Row(id: Long, text: String, prob: Vector, prediction: Double) =>println(s"($id, $text) --> prob=$prob, prediction=$prediction")}model selection
- ML里面用CrossValidator類來做交叉驗(yàn)證,這個(gè)類包含一個(gè)estimator、一堆paramMap、和一個(gè)evaluator。
evaluator有三個(gè)子類,包括regressionEvaluator, BinaryClassificationEvaluator, MulticlassClassificationEvaluator。
- ML中除了cv以外,還有一種指定樣本劃分的驗(yàn)證方式,TrainValidationSplit 類,默認(rèn)是0.75,即3/4用于做訓(xùn)練,1/4用于做測(cè)試。其他跟cv一樣
總結(jié)
以上是生活随笔為你收集整理的1. spark ML概述的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: vue实现幻灯片切换效果
- 下一篇: 乘法逆元模板