【机器学习】在机器学习中处理大量数据!
作者:牧小熊,華中農業大學,Datawhale成員
知乎 |?https://zhuanlan.zhihu.com/p/357361005
之前系統梳理過大數據概念和基礎知識(可點擊),本文基于PySpark在機器學習實踐中的用法,希望對大數據學習的同學起到拋磚引玉的作用。(當數據集較小時,用Pandas足夠,當數據量較大時,就需要利用分布式數據處理工具,Spark很適用)
1.PySpark簡介
Apache Spark是一個閃電般快速的實時處理框架。它進行內存計算以實時分析數據。由于Apache Hadoop MapReduce僅執行批處理并且缺乏實時處理功能,因此它開始出現。因此,引入了Apache Spark,因為它可以實時執行流處理,也可以處理批處理。
Apache Spark是Scala語言實現的一個計算框架。為了支持Python語言使用Spark,Apache Spark社區開發了一個工具PySpark。我們可以通過Python語言操作RDDs
RDD簡介
RDD (Resiliennt Distributed Datasets)
?RDD = 彈性 + 分布式 Datasets
1)分布式,好處是讓數據在不同工作節點并行存儲,并行計算
2)彈性,指的節點存儲時,既可以使用內存,也可以使用外存
?RDD還有個特性是延遲計算,也就是一個完整的RDD運行任務分成兩部分:Transformation和Action
Spark RDD的特性:
分布式:可以分布在多臺機器上進行并行處理
彈性:計算過程中內存不夠時,它會和磁盤進行數據交換
基于內存:可以全部或部分緩存在內存中
只讀:不能修改,只能通過轉換操作生成新的 RDD
2.Pandas和PySpark對比
可以參考這位作者的,詳細的介紹了pyspark與pandas之間的區別:
https://link.zhihu.com/?target=https%3A//blog.csdn.net/suzyu12345/article/details/79673483
3.PySpark實戰小練
數據集:從1994年人口普查數據庫中提取。(后臺回復“210323”可獲取)
TO DO:預測一個人新收入是否會超過5萬美金
參數說明:
創建SparkSession
from?pyspark.sql?import?SparkSession spark=SparkSession.builder.appName('adult').getOrCreate()讀取數據
df?=?spark.read.csv('adult.csv',?inferSchema?=?True,?header=True)?#讀取csv文件 df.show(3)??#用來顯示前3行注意:pyspark必須創建SparkSession才能像類似于pandas一樣操作數據集
我們看看數據集:
cols?=?df.columns?#和pandas一樣看列名 df.printSchema()root
|-- age: integer (nullable = true)
|-- workclass: string (nullable = true)
|-- fnlwgt: integer (nullable = true)
|-- education: string (nullable = true)
|-- education-num: integer (nullable = true)
|-- marital-status: string (nullable = true)
|-- occupation: string (nullable = true)
|-- relationship: string (nullable = true)
|-- race: string (nullable = true)
|-- sex: string (nullable = true)
|-- capital-gain: integer (nullable = true)
|-- capital-loss: integer (nullable = true)
|-- hours-per-week: integer (nullable = true)
|-- native-country: string (nullable = true)
|-- income: string (nullable = true)
對于類別變量我們需要進行編碼,在pyspark中提供了StringIndexer, OneHotEncoder, VectorAssembler特征編碼模式:
from?pyspark.ml?import?Pipeline from?pyspark.ml.feature?import?StringIndexer,?OneHotEncoder,?VectorAssemblerstages?=?[] for?col?in?cat_features:#?字符串轉成索引string_index?=?StringIndexer(inputCol?=?col,?outputCol?=?col?+?'Index')#?轉換為OneHot編碼encoder?=?OneHotEncoder(inputCols=[string_index.getOutputCol()],?outputCols=[col?+?"_one_hot"])#?將每個字段的轉換方式?放到stages中stages?+=?[string_index,?encoder]#?將income轉換為索引 label_string_index?=?StringIndexer(inputCol?=?'income',?outputCol?=?'label') #?添加到stages中 stages?+=?[label_string_index]#?類別變量?+?數值變量 assembler_cols?=?[c?+?"_one_hot"?for?c?in?cat_features]?+?num_features assembler?=?VectorAssembler(inputCols=assembler_cols,?outputCol="features") stages?+=?[assembler]#?使用pipeline完成數據處理 pipeline?=?Pipeline(stages=stages) pipeline_model?=?pipeline.fit(df) df?=?pipeline_model.transform(df) selected_cols?=?["label",?"features"]?+?cols df?=?df.select(selected_cols)因為pyspark顯示的數據比較像Mysql 那樣不方便觀看,因此我們轉成pandas:
import?pandas?as?pd pd.DataFrame(df.take(20),?columns?=?df.columns)通過pandas發現,好像還有較多字符串變量,難道特征編碼失敗了?
原來是使用VectorAssembler直接將特征轉成了features這一列,pyspark做ML時 需要特征編碼好了并做成向量列,
到這里,數據的特征工程就做好了。
分割數據集 測試集
train,?test?=?df.randomSplit([0.7,?0.3],?seed=2021) print(train.count()) print(test.count())22795
9766
可以看到,訓練集和測試集安裝7:3的比例分割了,接下來就是構建模型進行訓練。
邏輯回歸
from?pyspark.ml.classification?import?LogisticRegression #?創建模型 lr?=?LogisticRegression(featuresCol?=?'features',?labelCol?=?'label',maxIter=10) lr_model?=?lr.fit(train)可以看到ML的用法和sklearn非常的像,因此使用起來也是相當的方便。
#結果預測predictions?=?lr_model.transform(test)看看predictions的結構
predictions.printSchema()root
|-- label: double (nullable = false)
|-- features: vector (nullable = true)
|-- age: integer (nullable = true)
|-- workclass: string (nullable = true)
|-- fnlwgt: integer (nullable = true)
|-- education: string (nullable = true)
|-- education-num: integer (nullable = true)
|-- marital-status: string (nullable = true)
|-- occupation: string (nullable = true)
|-- relationship: string (nullable = true)
|-- race: string (nullable = true)
|-- sex: string (nullable = true)
|-- capital-gain: integer (nullable = true)
|-- capital-loss: integer (nullable = true)
|-- hours-per-week: integer (nullable = true)
|-- native-country: string (nullable = true)
|-- income: string (nullable = true)
|-- rawPrediction: vector (nullable = true)
|-- probability: vector (nullable = true)
|-- prediction: double (nullable = false)
抽取需要的信息
selected?=?predictions.select("label",?"prediction",?"probability",?"age",?"occupation") display(selected) selected.show(4)技術AUC值
AUC:0.9062153434371653
進行網格調參
from?pyspark.ml.tuning?import?ParamGridBuilder,?CrossValidator#?創建網絡參數,用于交叉驗證 param_grid?=?(ParamGridBuilder().addGrid(lr.regParam,?[0.01,?0.5,?2.0]).addGrid(lr.elasticNetParam,?[0.0,?0.5,?1.0]).addGrid(lr.maxIter,?[1,?5,?10]).build()) #?五折交叉驗證,設置模型,網格參數,驗證方法,折數 cv?=?CrossValidator(estimator=lr,?estimatorParamMaps=param_grid,?evaluator=evaluator,?numFolds=5) #?交叉驗證運行 cv_model?=?cv.fit(train) #?對于測試數據,使用五折交叉驗證 predictions?=?cv_model.transform(test) print('AUC:',?evaluator.evaluate(predictions))AUC:0.9054096433333642
決策樹模型
from?pyspark.ml.classification?import?DecisionTreeClassifier#?創建決策樹模型 dt?=?DecisionTreeClassifier(featuresCol?=?'features',?labelCol?=?'label',?maxDepth?=?3) dt_model?=?dt.fit(train)#查看決策樹結構 print(dt_model._call_java('toDebugString'))DecisionTreeClassificationModel: uid=DecisionTreeClassifier_4bd113e9a3c2, depth=3, numNodes=11, numClasses=2, numFeatures=100
If (feature 23 in {0.0})
If (feature 97 <= 7792.0)
Predict: 0.0
Else (feature 97 > 7792.0)
If (feature 94 <= 19.5)
Predict: 0.0
Else (feature 94 > 19.5)
Predict: 1.0
Else (feature 23 not in {0.0})
If (feature 96 <= 12.5)
If (feature 97 <= 3368.0)
Predict: 0.0
Else (feature 97 > 3368.0)
Predict: 1.0
Else (feature 96 > 12.5)
Predict: 1.0
root
|-- label: double (nullable = false)
|-- features: vector (nullable = true)
|-- age: integer (nullable = true)
|-- workclass: string (nullable = true)
|-- fnlwgt: integer (nullable = true)
|-- education: string (nullable = true)
|-- education-num: integer (nullable = true)
|-- marital-status: string (nullable = true)
|-- occupation: string (nullable = true)
|-- relationship: string (nullable = true)
|-- race: string (nullable = true)
|-- sex: string (nullable = true)
|-- capital-gain: integer (nullable = true)
|-- capital-loss: integer (nullable = true)
|-- hours-per-week: integer (nullable = true)
|-- native-country: string (nullable = true)
|-- income: string (nullable = true)
|-- rawPrediction: vector (nullable = true)
|-- probability: vector (nullable = true)
|-- prediction: double (nullable = false)
0.7455098804457034
網格搜參數
from?pyspark.ml.tuning?import?ParamGridBuilder,?CrossValidator param_grid?=?(ParamGridBuilder().addGrid(dt.maxDepth,?[1,?2,?6,?10]).addGrid(dt.maxBins,?[20,?40,?80]).build()) #?設置五折交叉驗證 cv?=?CrossValidator(estimator=dt,?estimatorParamMaps=param_grid,?evaluator=evaluator,?numFolds=5) #?運行cv cv_model?=?cv.fit(train) #?查看最優模型 print("numNodes?=?",?cv_model.bestModel.numNodes) print("depth?=?",?cv_model.bestModel.depth)numNodes = 429
depth = 10
0.7850384321616918
隨機森林
from?pyspark.ml.classification?import?RandomForestClassifier #?隨機森林 rf?=?RandomForestClassifier(featuresCol?=?'features',?labelCol?=?'label') rf_model?=?rf.fit(train) predictions?=?rf_model.transform(test) predictions.printSchema() selected?=?predictions.select("label",?"prediction",?"probability",?"age",?"occupation") display(selected)evaluator?=?BinaryClassificationEvaluator() evaluator.evaluate(predictions)0.8932162982538805
一樣的方法
param_grid?=?(ParamGridBuilder().addGrid(rf.maxDepth,?[3,?5,?7]).addGrid(rf.maxBins,?[20,?50]).addGrid(rf.numTrees,?[5,?10]).build()) cv?=?CrossValidator(estimator=rf,?estimatorParamMaps=param_grid,?evaluator=evaluator,?numFolds=5) #?運行CV(大約6分鐘) cv_model?=?cv.fit(train) predictions?=?cv_model.transform(test) evaluator.evaluate(predictions)0.8948376797236669
查看模型的結構和特征重要性
best_modelRandomForestClassificationModel: uid=RandomForestClassifier_15bbbdd6642a, numTrees=10, numClasses=2, numFeatures=100
best_model.featureImportances太長了省略
df.schema['features'].metadata temp?=?df.schema["features"].metadata["ml_attr"]["attrs"] df_importance?=?pd.DataFrame(columns=['idx',?'name']) for?attr?in?temp['numeric']:temp_df?=?{}temp_df['idx']?=?attr['idx']temp_df['name']?=?attr['name']#print(temp_df)df_importance?=?df_importance.append(temp_df,?ignore_index=True)#print(attr['idx'],?attr['name'])#print(attr)#break df_importance for?attr?in?temp['binary']:temp_df?=?{}temp_df['idx']?=?attr['idx']temp_df['name']?=?attr['name']df_importance?=?df_importance.append(temp_df,?ignore_index=True) df_importance對特征重要性進行可視化
4.小結
本節選用了一個常規的數據集,需要通過UCI提供的數據預測個人收入是否會大于5萬,本節用PySpark對數據進行了讀取,特征的編碼以及特征的構建,并分別使用了邏輯回歸、決策樹以及隨機森林算法展示數據預測的過程。
spark通過封裝成pyspark后使用難度降低了很多,而且pyspark的ML包提供了基本的機器學習模型,可以直接使用,模型的使用方法和sklearn比較相似,因此學習成本較低。
往期精彩回顧適合初學者入門人工智能的路線及資料下載機器學習及深度學習筆記等資料打印機器學習在線手冊深度學習筆記專輯《統計學習方法》的代碼復現專輯 AI基礎下載機器學習的數學基礎專輯 本站qq群851320808,加入微信群請掃碼:總結
以上是生活随笔為你收集整理的【机器学习】在机器学习中处理大量数据!的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 【效率】超详细!手把手带你快速入门 Gi
- 下一篇: win7系统字体模糊发虚不清晰的解决方法