梯度迭代树回归(GBDT)算法介绍及Spark MLlib调用实例(Scala/Java/python)
梯度迭代樹回歸
算法簡介:梯度提升樹是一種決策樹的集成算法。它通過反復迭代訓練決策樹來最小化損失函數。決策樹類似,梯度提升樹具有可處理類別特征、易擴展到多分類問題、不需特征縮放等性質。Spark.ml通過使用現有decision tree工具來實現。
梯度提升樹依次迭代訓練一系列的決策樹。在一次迭代中,算法使用現有的集成來對每個訓練實例的類別進行預測,然后將預測結果與真實的標簽值進行比較。通過重新標記,來賦予預測結果不好的實例更高的權重。所以,在下次迭代中,決策樹會對先前的錯誤進行修正。
對實例標簽進行重新標記的機制由損失函數來指定。每次迭代過程中,梯度迭代樹在訓練數據上進一步減少損失函數的值。spark.ml為分類問題提供一種損失函數(Log Loss),為回歸問題提供兩種損失函數(平方誤差與絕對誤差)。
Spark.ml支持二分類以及回歸的隨機森林算法,適用于連續特征以及類別特征。
*注意梯度提升樹目前不支持多分類問題。
參數:checkpointInterval:
類型:整數型。
含義:設置檢查點間隔(>=1),或不設置檢查點(-1)。
featuresCol:
類型:字符串型。
含義:特征列名。
impurity:
類型:字符串型。
含義:計算信息增益的準則(不區分大小寫)。
labelCol:
類型:字符串型。
含義:標簽列名。
lossType:
類型:字符串型。
含義:損失函數類型。
maxBins:
類型:整數型。
含義:連續特征離散化的最大數量,以及選擇每個節點分裂特征的方式。
maxDepth:
類型:整數型。
含義:樹的最大深度(>=0)。
maxIter:
類型:整數型。
含義:迭代次數(>=0)。
minInfoGain:
類型:雙精度型。
含義:分裂節點時所需最小信息增益。
minInstancesPerNode:
類型:整數型。
含義:分裂后自節點最少包含的實例數量。
predictionCol:
類型:字符串型。
含義:預測結果列名。
seed:
類型:長整型。
含義:隨機種子。
subsamplingRate:
類型:雙精度型。
含義:學習一棵決策樹使用的訓練數據比例,范圍[0,1]。
stepSize:
類型:雙精度型。
含義:每次迭代優化步長。
示例:下面的例子中,GBTRegressor僅迭代了一次,在實際操作中是不現實的。
Scala:
import org.apache.spark.ml.Pipeline import org.apache.spark.ml.evaluation.RegressionEvaluator import org.apache.spark.ml.feature.VectorIndexer import org.apache.spark.ml.regression.{GBTRegressionModel, GBTRegressor}// Load and parse the data file, converting it to a DataFrame. val data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")// Automatically identify categorical features, and index them. // Set maxCategories so features with > 4 distinct values are treated as continuous. val featureIndexer = new VectorIndexer().setInputCol("features").setOutputCol("indexedFeatures").setMaxCategories(4).fit(data)// Split the data into training and test sets (30% held out for testing). val Array(trainingData, testData) = data.randomSplit(Array(0.7, 0.3))// Train a GBT model. val gbt = new GBTRegressor().setLabelCol("label").setFeaturesCol("indexedFeatures").setMaxIter(10)// Chain indexer and GBT in a Pipeline. val pipeline = new Pipeline().setStages(Array(featureIndexer, gbt))// Train model. This also runs the indexer. val model = pipeline.fit(trainingData)// Make predictions. val predictions = model.transform(testData)// Select example rows to display. predictions.select("prediction", "label", "features").show(5)// Select (prediction, true label) and compute test error. val evaluator = new RegressionEvaluator().setLabelCol("label").setPredictionCol("prediction").setMetricName("rmse") val rmse = evaluator.evaluate(predictions) println("Root Mean Squared Error (RMSE) on test data = " + rmse)val gbtModel = model.stages(1).asInstanceOf[GBTRegressionModel] println("Learned regression GBT model:\n" + gbtModel.toDebugString)Java:
import org.apache.spark.ml.Pipeline; import org.apache.spark.ml.PipelineModel; import org.apache.spark.ml.PipelineStage; import org.apache.spark.ml.evaluation.RegressionEvaluator; import org.apache.spark.ml.feature.VectorIndexer; import org.apache.spark.ml.feature.VectorIndexerModel; import org.apache.spark.ml.regression.GBTRegressionModel; import org.apache.spark.ml.regression.GBTRegressor; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession;// Load and parse the data file, converting it to a DataFrame. Dataset<Row> data = spark.read().format("libsvm").load("data/mllib/sample_libsvm_data.txt");// Automatically identify categorical features, and index them. // Set maxCategories so features with > 4 distinct values are treated as continuous. VectorIndexerModel featureIndexer = new VectorIndexer().setInputCol("features").setOutputCol("indexedFeatures").setMaxCategories(4).fit(data);// Split the data into training and test sets (30% held out for testing). Dataset<Row>[] splits = data.randomSplit(new double[] {0.7, 0.3}); Dataset<Row> trainingData = splits[0]; Dataset<Row> testData = splits[1];// Train a GBT model. GBTRegressor gbt = new GBTRegressor().setLabelCol("label").setFeaturesCol("indexedFeatures").setMaxIter(10);// Chain indexer and GBT in a Pipeline. Pipeline pipeline = new Pipeline().setStages(new PipelineStage[] {featureIndexer, gbt});// Train model. This also runs the indexer. PipelineModel model = pipeline.fit(trainingData);// Make predictions. Dataset<Row> predictions = model.transform(testData);// Select example rows to display. predictions.select("prediction", "label", "features").show(5);// Select (prediction, true label) and compute test error. RegressionEvaluator evaluator = new RegressionEvaluator().setLabelCol("label").setPredictionCol("prediction").setMetricName("rmse"); double rmse = evaluator.evaluate(predictions); System.out.println("Root Mean Squared Error (RMSE) on test data = " + rmse);GBTRegressionModel gbtModel = (GBTRegressionModel)(model.stages()[1]); System.out.println("Learned regression GBT model:\n" + gbtModel.toDebugString());Python:
from pyspark.ml import Pipeline from pyspark.ml.regression import GBTRegressor from pyspark.ml.feature import VectorIndexer from pyspark.ml.evaluation import RegressionEvaluator# Load and parse the data file, converting it to a DataFrame. data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")# Automatically identify categorical features, and index them. # Set maxCategories so features with > 4 distinct values are treated as continuous. featureIndexer =\VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(data)# Split the data into training and test sets (30% held out for testing) (trainingData, testData) = data.randomSplit([0.7, 0.3])# Train a GBT model. gbt = GBTRegressor(featuresCol="indexedFeatures", maxIter=10)# Chain indexer and GBT in a Pipeline pipeline = Pipeline(stages=[featureIndexer, gbt])# Train model. This also runs the indexer. model = pipeline.fit(trainingData)# Make predictions. predictions = model.transform(testData)# Select example rows to display. predictions.select("prediction", "label", "features").show(5)# Select (prediction, true label) and compute test error evaluator = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse") rmse = evaluator.evaluate(predictions) print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)gbtModel = model.stages[1] print(gbtModel) # summary only?
總結
以上是生活随笔為你收集整理的梯度迭代树回归(GBDT)算法介绍及Spark MLlib调用实例(Scala/Java/python)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: spark.mllib源码阅读:Grad
- 下一篇: spark mllib源码分析之随机森林