spark ml pipelines
spark ML Pipelines
在spark2.0里mllib分為兩個包,spark.mllib里是基于RDD的API,spark.ml里是基于 DataFrame的API。官方不會在基于RDD的mllib里添加新特性。所以建議使用ml包。在spark2.2時基于RDD的API會被廢棄,到spark3.0會被徹底移除。
Pipelines主要概念
DataFrame: This ML API uses DataFrame from Spark SQL as an ML dataset, which can hold a variety of data types. E.g., a DataFrame could have different columns storing text, feature vectors, true labels, and predictions.
ML使用Spark SQL里來的數據結構DataFrame作為數據集,DataFrame能存儲各種數據類型,例如DataFrame可以有不同的列存儲文本,特征向量,真標簽,預測值等。
Transformer: A Transformer is an algorithm which can transform one DataFrame into another DataFrame. E.g., an ML model is a Transformer which transforms a DataFrame with features into a DataFrame with predictions.
Transformer是一個能將一個DataFrame轉換為另一個DataFrame的算法。
例如一個ML模型就是一個能將帶特征的DataFrame轉換為一個帶預測結果的DataFrame的Transformer。Estimator: An Estimator is an algorithm which can be fit on a DataFrame to produce a Transformer. E.g., a learning algorithm is an Estimator which trains on a DataFrame and produces a model.
Estimator是作用在DataFrame上產生Transformer的算法
Pipeline: A Pipeline chains multiple Transformers and Estimators together to specify an ML workflow.
Pipeline將多個Transformer和Estimators鏈接起來形成一個特定的ML工作流。
Parameter: All Transformers and Estimators now share a common API for specifying parameters.
所有Transformer和Estimator共享一個指定參數的API
Pipeline components
Transformers
A Transformer is an abstraction that includes feature transformers and learned models. Technically, a Transformer implements a method transform(), which converts one DataFrame into another, generally by appending one or more columns. For example:
- A feature transformer might take a DataFrame, read a column (e.g., text), map it into a new column (e.g., feature vectors), and output a new DataFrame with the mapped column appended.
A learning model might take a DataFrame, read the column containing feature vectors, predict the label for each feature vector, and output a new DataFrame with predicted labels appended as a column.
Transformer是一個包含特征轉換和學習模型的抽象概念。它實現了trandform()方法,能夠將一個DataFrame轉換成另一個。
Estimators
An Estimator abstracts the concept of a learning algorithm or any algorithm that fits or trains on data. Technically, an Estimator implements a method fit(), which accepts a DataFrame and produces a Model, which is a Transformer. For example, a learning algorithm such as LogisticRegression is an Estimator, and calling fit() trains a LogisticRegressionModel, which is a Model and hence a Transformer.
Estimator是一個學習算法或任何可以用來訓練數據的算法。它實現了 fit()方法,它接收一個DataFrame作為輸入然后產生一個模型。例如, LogisticRegression是個Estimator,調用它的fit()方法能夠訓練 出模型LogisticRegressionModelProperties of pipeline components
Transformer.transform()s and Estimator.fit()s are both stateless. In the future, stateful algorithms may be supported via alternative concepts.
Transformer.transform()s 和 Estimator.fit()s 都是無狀態的 將來會通過替代概念實現有狀態的算法。Each instance of a Transformer or Estimator has a unique ID, which is useful in specifying parameters (discussed below).
每個Transformer 或 Estimator 都有一個獨一無二的ID,在指定參數時 會非常有用(下面會討論)Pipeline
In machine learning, it is common to run a sequence of algorithms to process and learn from data. E.g., a simple text document processing workflow might include several stages:
在機器學習中,通過一系列算法處理數據和從數據里學習知識都是很正常的事 以簡單的文本處理為例,它的工作流中會包括以下幾個階段Split each document’s text into words.
將每個文檔的文本切分為單詞
Convert each document’s words into a numerical feature vector.
將每個文檔轉換為數字化的特征向量
Learn a prediction model using the feature vectors and labels.
使用特征向量和標簽生成預測模型
MLlib represents such a workflow as a Pipeline, which consists of a sequence of PipelineStages (Transformers and Estimators) to be run in a specific order. We will use this simple workflow as a running example in this section.
MLlib使用Pipeline表示這樣的工作流,它包含了一系列按特定順序的 PipelineStages (Transformers and Estimators)如何工作?
Pipeline是由每個階段都是Transformer 或 Estimator的一系列特定階段組成。這些階段都是有序的,輸入DataFrame通過每個階段時都會被轉換。在 Transformer階段,在DataFrame上調用transform()方法。在Estimator 階段,fit()方法被調用產生一個Transformer (which becomes part of the PipelineModel, or fitted Pipeline),Transformer在DataFrame上調用transform()方法。
上圖中上面一行Pipeline由三個階段組成,前兩個階段Tokenizer和HashingTF都是Transformers(藍色)。第三個階段LogisticRegression是個Estimator
(紅色)。下面一行代表通過這個Pipeline的數據流,圓柱體表示DataFrames
左邊的Pipeline.fit()方法作用于含有文本和標簽原始DataFrame
Tokenizer.transform()將文本切分為單詞,并在DataFrame上增加一個單詞列HashingTF.transform()將單詞列轉換為特征向量,并將向量列加入DataFrame
LogisticRegression是一個Estimator,Pipeline第一次調用LogisticRegression.fit()方法產生LogisticRegressionModel如果這個Pipeline有更多階段,它會在將 DataFrame送入下個階段之前調用LogisticRegressionModel’s transform()
上圖是一個 PipelineModel,它和原始的 Pipeline都是三個階段,但是原來的所有Estimators都變成Transformers了。當在測試集上調用PipelineModel’s transform()時,數據按序通過每個階段,并在將它送到下階段之前調用transform()方法。
Pipelines和PipelineModels確保訓練集和測試集經過同樣的特征處理過程
參數:
MLlib中Estimators和Transformers使用統一的API指定參數
Param是命名參數ParamMap一系列(parameter, value)鍵值對
給算法傳參有兩個主要方法:
代碼示例:
示例:Estimator, Transformer, and Param
package org.apache.spark.examples.ml// $example on$ import org.apache.spark.ml.classification.LogisticRegression import org.apache.spark.ml.linalg.{Vector, Vectors} import org.apache.spark.ml.param.ParamMap import org.apache.spark.sql.Row // $example off$ import org.apache.spark.sql.SparkSessionobject EstimatorTransformerParamExample {def main(args: Array[String]): Unit = {val spark = SparkSession.builder.appName("EstimatorTransformerParamExample").getOrCreate()// $example on$// Prepare training data from a list of (label, features) tuples.val training = spark.createDataFrame(Seq((1.0, Vectors.dense(0.0, 1.1, 0.1)),(0.0, Vectors.dense(2.0, 1.0, -1.0)),(0.0, Vectors.dense(2.0, 1.3, 1.0)),(1.0, Vectors.dense(0.0, 1.2, -0.5)))).toDF("label", "features")// Create a LogisticRegression instance. This instance is an Estimator.val lr = new LogisticRegression()// Print out the parameters, documentation, and any default values.println("LogisticRegression parameters:\n" + lr.explainParams() + "\n")// We may set parameters using setter methods.lr.setMaxIter(10).setRegParam(0.01)// Learn a LogisticRegression model. This uses the parameters stored in lr.val model1 = lr.fit(training)// Since model1 is a Model (i.e., a Transformer produced by an Estimator),// we can view the parameters it used during fit().// This prints the parameter (name: value) pairs, where names are unique IDs for this// LogisticRegression instance.println("Model 1 was fit using parameters: " + model1.parent.extractParamMap)// We may alternatively specify parameters using a ParamMap,// which supports several methods for specifying parameters.val paramMap = ParamMap(lr.maxIter -> 20).put(lr.maxIter, 30) // Specify 1 Param. This overwrites the original maxIter..put(lr.regParam -> 0.1, lr.threshold -> 0.55) // Specify multiple Params.// One can also combine ParamMaps.val paramMap2 = ParamMap(lr.probabilityCol -> "myProbability") // Change output column name.val paramMapCombined = paramMap ++ paramMap2// Now learn a new model using the paramMapCombined parameters.// paramMapCombined overrides all parameters set earlier via lr.set* methods.val model2 = lr.fit(training, paramMapCombined)println("Model 2 was fit using parameters: " + model2.parent.extractParamMap)// Prepare test data.val test = spark.createDataFrame(Seq((1.0, Vectors.dense(-1.0, 1.5, 1.3)),(0.0, Vectors.dense(3.0, 2.0, -0.1)),(1.0, Vectors.dense(0.0, 2.2, -1.5)))).toDF("label", "features")// Make predictions on test data using the Transformer.transform() method.// LogisticRegression.transform will only use the 'features' column.// Note that model2.transform() outputs a 'myProbability' column instead of the usual// 'probability' column since we renamed the lr.probabilityCol parameter previously.model2.transform(test).select("features", "label", "myProbability", "prediction").collect().foreach { case Row(features: Vector, label: Double, prob: Vector, prediction: Double) =>println(s"($features, $label) -> prob=$prob, prediction=$prediction")}// $example off$spark.stop()} }如果運行失敗請參考我的上篇文章
spark Exception in thread “main” java.lang.IllegalArgumentException: java.net.URISyntaxException
示例:Pipeline
package org.apache.spark.examples.ml// $example on$ import org.apache.spark.ml.{Pipeline, PipelineModel} import org.apache.spark.ml.classification.LogisticRegression import org.apache.spark.ml.feature.{HashingTF, Tokenizer} import org.apache.spark.ml.linalg.Vector import org.apache.spark.sql.Row // $example off$ import org.apache.spark.sql.SparkSessionobject PipelineExample {def main(args: Array[String]): Unit = {val spark = SparkSession.builder.appName("PipelineExample").getOrCreate()// $example on$// Prepare training documents from a list of (id, text, label) tuples.val training = spark.createDataFrame(Seq((0L, "a b c d e spark", 1.0),(1L, "b d", 0.0),(2L, "spark f g h", 1.0),(3L, "hadoop mapreduce", 0.0))).toDF("id", "text", "label")// Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr.val tokenizer = new Tokenizer().setInputCol("text").setOutputCol("words")val hashingTF = new HashingTF().setNumFeatures(1000).setInputCol(tokenizer.getOutputCol).setOutputCol("features")val lr = new LogisticRegression().setMaxIter(10).setRegParam(0.01)val pipeline = new Pipeline().setStages(Array(tokenizer, hashingTF, lr))// Fit the pipeline to training documents.val model = pipeline.fit(training)// Now we can optionally save the fitted pipeline to diskmodel.write.overwrite().save("/tmp/spark-logistic-regression-model")// We can also save this unfit pipeline to diskpipeline.write.overwrite().save("/tmp/unfit-lr-model")// And load it back in during productionval sameModel = PipelineModel.load("/tmp/spark-logistic-regression-model")// Prepare test documents, which are unlabeled (id, text) tuples.val test = spark.createDataFrame(Seq((4L, "spark i j k"),(5L, "l m n"),(6L, "mapreduce spark"),(7L, "apache hadoop"))).toDF("id", "text")// Make predictions on test documents.model.transform(test).select("id", "text", "probability", "prediction").collect().foreach { case Row(id: Long, text: String, prob: Vector, prediction: Double) =>println(s"($id, $text) --> prob=$prob, prediction=$prediction")}// $example off$spark.stop()} }參考鏈接官網
總結
以上是生活随笔為你收集整理的spark ml pipelines的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: java编一个求圆柱表面积_Java:输
- 下一篇: biee java_转:BIEE11g