desehawk 发表于 2017-1-23 18:08:26

Spark机器学习入门4·分类模型(spark-shell)如何实现的

本帖最后由 desehawk 于 2017-1-23 18:11 编辑

问题导读
1.训练分类模型是如何实现的?
2.如何使用分类模型?
3.如何实现评估性能?


static/image/hrline/4.gif

上一篇Spark机器学习入门3·推荐引擎(spark-shell)如何实现的
http://www.aboutyun.com/forum.php?mod=viewthread&tid=20871



Spark机器学习
[*]线性模型

[*]逻辑回归--逻辑损失(logistic loss)
[*]线性支持向量机(Support Vector Machine, SVM)--合页损失(hinge loss)
[*]朴素贝叶斯(Naive Bayes)
[*]决策树
0 准备数据kaggle2.blob.core.windows.net/competitions-data/kaggle/3526/train.tsv
sed 1d train.tsv > train_noheader.tsv


0 运行环境

cd /Users/erichan/Garden/spark-1.5.1-bin-cdh4

bin/spark-shell --name my_mlib --packages org.jblas:jblas:1.2.4-SNAPSHOT --driver-memory 4G --executor-memory 4G --driver-cores 2

import org.apache.spark.mllib.feature._
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics
import org.apache.spark.mllib.linalg.distributed.RowMatrix
import org.apache.spark.rdd.RDD
import org.apache.spark.mllib.optimization._
import org.apache.spark.mllib.classification._
import org.apache.spark.mllib.evaluation._
import org.apache.spark.mllib.classification._
import org.apache.spark.mllib.tree.DecisionTree
import org.apache.spark.mllib.tree.configuration.Algo
import org.apache.spark.mllib.tree.impurity._

1 提取特征


val PATH = "/Users/erichan/sourcecode/book/Spark机器学习"
val rawData = sc.textFile(PATH+"/train_noheader.tsv")
val records = rawData.map(line => line.split("\t"))
records.first

Array = Array("http://www.bloomberg.com/news/2010-12-23/ibm-predicts-holographic-calls-air-breathing-batteries-by-2015.html", "4042", "{""title"":""IBM Sees Holographic Calls Air Breathing Batteries ibm sees holographic calls, air-breathing batteries"",""body"":""A sign stands outside the International Business Machines Corp IBM Almaden Research Center campus in San Jose California Photographer Tony Avelar Bloomberg Buildings stand at the International Business Machines Corp IBM Almaden Research Center campus in the Santa Teresa Hills of San Jose California Photographer Tony Avelar Bloomberg By 2015 your mobile phone will project a 3 D image of anyone who calls and your laptop will be powered by kinetic energy At least that s what International Business Machines Corp sees ...
val data = records.map { r =>
    val trimmed = r.map(_.replaceAll("\"", ""))
    val label = trimmed(r.size - 1).toInt
    val features = trimmed.slice(4, r.size - 1).map(d => if (d == "?") 0.0 else d.toDouble)
    LabeledPoint(label, Vectors.dense(features))
}
data.cache
val numData = data.count

numData: Long = 7395

// note that some of our data contains negative feature vaues. For naive Bayes we convert these to zeros
val nbData = records.map { r =>
    val trimmed = r.map(_.replaceAll("\"", ""))
    val label = trimmed(r.size - 1).toInt
    val features = trimmed.slice(4, r.size - 1).map(d => if (d == "?") 0.0 else d.toDouble).map(d => if (d < 0) 0.0 else d)
    LabeledPoint(label, Vectors.dense(features))
}

2 训练分类模型
2.1 逻辑回归模型

// train a Logistic Regression model
val numIterations = 10
val maxTreeDepth = 5
val lrModel = LogisticRegressionWithSGD.train(data, numIterations)

2.2 SVM模型


val svmModel = SVMWithSGD.train(data, numIterations)


2.3 朴素贝叶斯
val nbModel = NaiveBayes.train(nbData)



2.4 决策树
val dtModel = DecisionTree.train(data, Algo.Classification, Entropy, maxTreeDepth)



3 使用分类模型
3.1 预测

以逻辑回归模型为例
val dataPoint = data.first
val prediction = lrModel.predict(dataPoint.features)

prediction: Double = 1.0

val trueLabel = dataPoint.label


trueLabel: Double = 0.0

val predictions = lrModel.predict(data.map(lp => lp.features))
predictions.take(5)

Array = Array(1.0, 1.0, 1.0, 1.0, 1.0)



4 评估性能
4.1 逻辑回归模型的正确率

val lrTotalCorrect = data.map { point =>
if (lrModel.predict(point.features) == point.label) 1 else 0
}.sum
val lrAccuracy = lrTotalCorrect / numData

4.2 SVM模型的正确率


val svmTotalCorrect = data.map { point =>
if (svmModel.predict(point.features) == point.label) 1 else 0
}.sum
val svmAccuracy = svmTotalCorrect / numData

svmAccuracy: Double = 0.5146720757268425


4.3 朴素贝叶斯的正确率


val nbTotalCorrect = nbData.map { point =>
if (nbModel.predict(point.features) == point.label) 1 else 0
}.sum
val nbAccuracy = nbTotalCorrect / numData
nbAccuracy: Double = 0.5803921568627451


4.4 决策树的正确率
// decision tree threshold needs to be specified
val dtTotalCorrect = data.map { point =>
val score = dtModel.predict(point.features)
val predicted = if (score > 0.5) 1 else 0
if (predicted == point.label) 1 else 0
}.sum
val dtAccuracy = dtTotalCorrect / numData


dtAccuracy: Double = 0.6482758620689655


4.5 ROC曲线和AUC
val metrics = Seq(lrModel, svmModel).map { model =>
    val scoreAndLabels = data.map { point =>
      (model.predict(point.features), point.label)
    }
    val metrics = new BinaryClassificationMetrics(scoreAndLabels)
    (model.getClass.getSimpleName, metrics.areaUnderPR, metrics.areaUnderROC)
}
val nbMetrics = Seq(nbModel).map{ model =>
    val scoreAndLabels = nbData.map { point =>
      val score = model.predict(point.features)
      (if (score > 0.5) 1.0 else 0.0, point.label)
    }
    val metrics = new BinaryClassificationMetrics(scoreAndLabels)
    (model.getClass.getSimpleName, metrics.areaUnderPR, metrics.areaUnderROC)
}
val dtMetrics = Seq(dtModel).map{ model =>
    val scoreAndLabels = data.map { point =>
      val score = model.predict(point.features)
      (if (score > 0.5) 1.0 else 0.0, point.label)
    }
    val metrics = new BinaryClassificationMetrics(scoreAndLabels)
    (model.getClass.getSimpleName, metrics.areaUnderPR, metrics.areaUnderROC)
}
val allMetrics = metrics ++ nbMetrics ++ dtMetrics
allMetrics.foreach{ case (m, pr, roc) =>
    println(f"$m, Area under PR: ${pr * 100.0}%2.4f%%, Area under ROC: ${roc * 100.0}%2.4f%%")
}

LogisticRegressionModel, Area under PR: 75.6759%, Area under ROC: 50.1418%
SVMModel, Area under PR: 75.6759%, Area under ROC: 50.1418%
NaiveBayesModel, Area under PR: 68.0851%, Area under ROC: 58.3559%
DecisionTreeModel, Area under PR: 74.3081%, Area under ROC: 64.8837%


5 改进和调优
5.1 特征标准化


val vectors = data.map(lp => lp.features)
val matrix = new RowMatrix(vectors)
val matrixSummary = matrix.computeColumnSummaryStatistics()

println(matrixSummary.mean)
println(matrixSummary.min)
println(matrixSummary.max)
println(matrixSummary.variance)
println(matrixSummary.numNonzeros)










val scaler = new StandardScaler(withMean = true, withStd = true).fit(vectors)
val scaledData = data.map(lp => LabeledPoint(lp.label, scaler.transform(lp.features)))

println(data.first.features)
println(scaledData.first.features)
println((0.789131 - 0.41225805299526636)/math.sqrt(0.1097424416755897))





1.137647336497682
使用标准化重新训练
val lrModelScaled = LogisticRegressionWithSGD.train(scaledData, numIterations)
val lrTotalCorrectScaled = scaledData.map { point =>
if (lrModelScaled.predict(point.features) == point.label) 1 else 0
}.sum
val lrAccuracyScaled = lrTotalCorrectScaled / numData
// lrAccuracyScaled: Double = 0.6204192021636241
val lrPredictionsVsTrue = scaledData.map { point =>
    (lrModelScaled.predict(point.features), point.label)
}
val lrMetricsScaled = new BinaryClassificationMetrics(lrPredictionsVsTrue)
val lrPr = lrMetricsScaled.areaUnderPR
val lrRoc = lrMetricsScaled.areaUnderROC

println(f"${lrModelScaled.getClass.getSimpleName}\nAccuracy: ${lrAccuracyScaled * 100}%2.4f%%\nArea under PR: ${lrPr * 100.0}%2.4f%%\nArea under ROC: ${lrRoc * 100.0}%2.4f%%")


LogisticRegressionModel
Accuracy: 62.0419%
Area under PR: 72.7254%
Area under ROC: 61.9663%
5.2 其他特征
val categories = records.map(r => r(3)).distinct.collect.zipWithIndex.toMap
val numCategories = categories.size
val dataCategories = records.map { r =>
    val trimmed = r.map(_.replaceAll("\"", ""))
    val label = trimmed(r.size - 1).toInt
    val categoryIdx = categories(r(3))
    val categoryFeatures = Array.ofDim(numCategories)
    categoryFeatures(categoryIdx) = 1.0
    val otherFeatures = trimmed.slice(4, r.size - 1).map(d => if (d == "?") 0.0 else d.toDouble)
    val features = categoryFeatures ++ otherFeatures
    LabeledPoint(label, Vectors.dense(features))
}

println(dataCategories.first)

(0.0,)
标准化转换
// standardize the feature vectors
val scalerCats = new StandardScaler(withMean = true, withStd = true).fit(dataCategories.map(lp => lp.features))
val scaledDataCats = dataCategories.map(lp => LabeledPoint(lp.label, scalerCats.transform(lp.features)))

println(dataCategories.first.features)
println(scaledDataCats.first.features)




[-0.02326210589837061,2.7207366564548514,-0.4464212047941535,-0.22052688457880879,-0.028494000387023734,-0.2709990696925828,-0.23272797709480803,-0.2016540523193296,-0.09914991930875496,-0.38181322324318134,-0.06487757239262681,-0.6807527904251456,-0.20418221057887365,-0.10189469097220732,1.137647336497678,-0.08193557169294771,1.0251398128933331,-0.05586356442541689,-0.4688932531289357,-0.3543053263079386,-0.3175352172363148,0.3384507982396541,0.0,0.828822173315322,-0.14726894334628504,0.22963982357813484,-0.14162596909880876,0.7902380499177364,0.7171947294529865,-0.29799681649642257,-0.2034625779299476,-0.03296720969690391,-0.04878112975579913,0.9400699751165439,-0.10869848852526258,-0.2788207823137022]
使用扩展后的特征训练逻辑回归模型
val lrModelScaledCats = LogisticRegressionWithSGD.train(scaledDataCats, numIterations)
val lrTotalCorrectScaledCats = scaledDataCats.map { point =>
if (lrModelScaledCats.predict(point.features) == point.label) 1 else 0
}.sum
val lrAccuracyScaledCats = lrTotalCorrectScaledCats / numData
val lrPredictionsVsTrueCats = scaledDataCats.map { point =>
    (lrModelScaledCats.predict(point.features), point.label)
}
val lrMetricsScaledCats = new BinaryClassificationMetrics(lrPredictionsVsTrueCats)
val lrPrCats = lrMetricsScaledCats.areaUnderPR
val lrRocCats = lrMetricsScaledCats.areaUnderROC

println(f"${lrModelScaledCats.getClass.getSimpleName}\nAccuracy: ${lrAccuracyScaledCats * 100}%2.4f%%\nArea under PR: ${lrPrCats * 100.0}%2.4f%%\nArea under ROC: ${lrRocCats * 100.0}%2.4f%%")

LogisticRegressionModel
Accuracy: 66.5720%
Area under PR: 75.7964%
Area under ROC: 66.5483%
5.3 使用正确的数据格式

使用1-of-k便民店类型特征构建数据集

val dataNB = records.map { r =>
    val trimmed = r.map(_.replaceAll("\"", ""))
    val label = trimmed(r.size - 1).toInt
    val categoryIdx = categories(r(3))
    val categoryFeatures = Array.ofDim(numCategories)
    categoryFeatures(categoryIdx) = 1.0
    LabeledPoint(label, Vectors.dense(categoryFeatures))
}
重新训练贝叶斯模型,并评估性能
val nbModelCats = NaiveBayes.train(dataNB)
val nbTotalCorrectCats = dataNB.map { point =>
if (nbModelCats.predict(point.features) == point.label) 1 else 0
}.sum
val nbAccuracyCats = nbTotalCorrectCats / numData
val nbPredictionsVsTrueCats = dataNB.map { point =>
    (nbModelCats.predict(point.features), point.label)
}
val nbMetricsCats = new BinaryClassificationMetrics(nbPredictionsVsTrueCats)
val nbPrCats = nbMetricsCats.areaUnderPR
val nbRocCats = nbMetricsCats.areaUnderROC

println(f"${nbModelCats.getClass.getSimpleName}\nAccuracy: ${nbAccuracyCats * 100}%2.4f%%\nArea under PR: ${nbPrCats * 100.0}%2.4f%%\nArea under ROC: ${nbRocCats * 100.0}%2.4f%%")

NaiveBayesModel
Accuracy: 60.9601%
Area under PR: 74.0522%
Area under ROC: 60.5138%
5.4 模型参数调优

5.4.1 线性模型

基础优化技术:随机梯度下降(SGD)

辅助函数:根据输入,训练模型

// helper function to train a logistic regresson model
def trainWithParams(input: RDD, regParam: Double, numIterations: Int, updater: Updater, stepSize: Double) = {
    val lr = new LogisticRegressionWithSGD
    lr.optimizer.setNumIterations(numIterations).setUpdater(updater).setRegParam(regParam).setStepSize(stepSize)
    lr.run(input)
}
辅助函数:根据输入数据和分类模型,计算AUC
// helper function to create AUC metric
def createMetrics(label: String, data: RDD, model: ClassificationModel) = {
    val scoreAndLabels = data.map { point =>
      (model.predict(point.features), point.label)
    }
    val metrics = new BinaryClassificationMetrics(scoreAndLabels)
    (label, metrics.areaUnderROC)
}

迭代次数调优
// cache the data to increase speed of multiple runs agains the dataset
scaledDataCats.cache

// num iterations
val iterResults = Seq(1, 5, 10, 50).map { param =>
    val model = trainWithParams(scaledDataCats, 0.0, param, new SimpleUpdater, 1.0)
    createMetrics(s"$param iterations", scaledDataCats, model)
}
iterResults.foreach { case (param, auc) => println(f"$param, AUC = ${auc * 100}%2.2f%%") }

1 iterations, AUC = 64.97%
5 iterations, AUC = 66.62%
10 iterations, AUC = 66.55%
50 iterations, AUC = 66.81%
步长调优
// step size
val numIterations = 10
val stepResults = Seq(0.001, 0.01, 0.1, 1.0, 10.0).map { param =>
    val model = trainWithParams(scaledDataCats, 0.0, numIterations, new SimpleUpdater, param)
    createMetrics(s"$param step size", scaledDataCats, model)
}
stepResults.foreach { case (param, auc) => println(f"$param, AUC = ${auc * 100}%2.2f%%") }

0.001 step size, AUC = 64.95%
0.01 step size, AUC = 65.00%
0.1 step size, AUC = 65.52%
1.0 step size, AUC = 66.55%
10.0 step size, AUC = 61.92%
使用SquaredL2Updater研究正则化参数
// regularization
val regResults = Seq(0.001, 0.01, 0.1, 1.0, 10.0).map { param =>
    val model = trainWithParams(scaledDataCats, param, numIterations, new SquaredL2Updater, 1.0)
    createMetrics(s"$param L2 regularization parameter", scaledDataCats, model)
}
regResults.foreach { case (param, auc) => println(f"$param, AUC = ${auc * 100}%2.2f%%") }


0.001 L2 regularization parameter, AUC = 66.55%
0.01 L2 regularization parameter, AUC = 66.55%
0.1 L2 regularization parameter, AUC = 66.63%
1.0 L2 regularization parameter, AUC = 66.04%
10.0 L2 regularization parameter, AUC = 35.33%
5.4.2 决策树
辅助函数:接收树的深度和不纯度
// investigate decision tree
def trainDTWithParams(input: RDD, maxDepth: Int, impurity: Impurity) = {
    DecisionTree.train(input, Algo.Classification, impurity, maxDepth)
}



使用Entropy不纯度
// investigate tree depth impact for Entropy impurity
val dtResultsEntropy = Seq(1, 2, 3, 4, 5, 10, 20).map { param =>
    val model = trainDTWithParams(data, param, Entropy)
    val scoreAndLabels = data.map { point =>
      val score = model.predict(point.features)
      (if (score > 0.5) 1.0 else 0.0, point.label)
    }
    val metrics = new BinaryClassificationMetrics(scoreAndLabels)
    (s"$param tree depth", metrics.areaUnderROC)
}
dtResultsEntropy.foreach { case (param, auc) => println(f"$param, AUC = ${auc * 100}%2.2f%%") }


1 tree depth, AUC = 59.33%
2 tree depth, AUC = 61.68%
3 tree depth, AUC = 62.61%
4 tree depth, AUC = 63.63%
5 tree depth, AUC = 64.88%
10 tree depth, AUC = 76.26%
20 tree depth, AUC = 98.45%
使用Gini不纯度
// investigate tree depth impact for Gini impurity
val dtResultsGini = Seq(1, 2, 3, 4, 5, 10, 20).map { param =>
    val model = trainDTWithParams(data, param, Gini)
    val scoreAndLabels = data.map { point =>
      val score = model.predict(point.features)
      (if (score > 0.5) 1.0 else 0.0, point.label)
    }
    val metrics = new BinaryClassificationMetrics(scoreAndLabels)
    (s"$param tree depth", metrics.areaUnderROC)
}
dtResultsGini.foreach { case (param, auc) => println(f"$param, AUC = ${auc * 100}%2.2f%%") }


1 tree depth, AUC = 59.33%
2 tree depth, AUC = 61.68%
3 tree depth, AUC = 62.61%
4 tree depth, AUC = 63.63%
5 tree depth, AUC = 64.89%
10 tree depth, AUC = 78.37%
20 tree depth, AUC = 98.87%
5.4.3 朴素贝叶斯

辅助函数:接收lamda参数

// investigate Naive Bayes parameters
def trainNBWithParams(input: RDD, lambda: Double) = {
    val nb = new NaiveBayes
    nb.setLambda(lambda)
    nb.run(input)
}

val nbResults = Seq(0.001, 0.01, 0.1, 1.0, 10.0).map { param =>
    val model = trainNBWithParams(dataNB, param)
    val scoreAndLabels = dataNB.map { point =>
      (model.predict(point.features), point.label)
    }
    val metrics = new BinaryClassificationMetrics(scoreAndLabels)
    (s"$param lambda", metrics.areaUnderROC)
}
nbResults.foreach { case (param, auc) => println(f"$param, AUC = ${auc * 100}%2.2f%%") }

0.001 lambda, AUC = 60.51%
0.01 lambda, AUC = 60.51%
0.1 lambda, AUC = 60.51%
1.0 lambda, AUC = 60.51%
10.0 lambda, AUC = 60.51%
5.4.4 交叉验证

划分训练集和测试集

// illustrate cross-validation
// create a 60% / 40% train/test data split
val trainTestSplit = scaledDataCats.randomSplit(Array(0.6, 0.4), 123)
val train = trainTestSplit(0)
val test = trainTestSplit(1)

测试集的模型性能
val regResultsTest = Seq(0.0, 0.001, 0.0025, 0.005, 0.01).map { param =>
    val model = trainWithParams(train, param, numIterations, new SquaredL2Updater, 1.0)
    createMetrics(s"$param L2 regularization parameter", test, model)
}
regResultsTest.foreach { case (param, auc) => println(f"$param, AUC = ${auc * 100}%2.6f%%") }

0.0 L2 regularization parameter, AUC = 66.480874%
0.001 L2 regularization parameter, AUC = 66.480874%
0.0025 L2 regularization parameter, AUC = 66.515027%
0.005 L2 regularization parameter, AUC = 66.515027%
0.01 L2 regularization parameter, AUC = 66.549180%
训练集的模型性能
// training set results
val regResultsTrain = Seq(0.0, 0.001, 0.0025, 0.005, 0.01).map { param =>
    val model = trainWithParams(train, param, numIterations, new SquaredL2Updater, 1.0)
    createMetrics(s"$param L2 regularization parameter", train, model)
}
regResultsTrain.foreach { case (param, auc) => println(f"$param, AUC = ${auc * 100}%2.6f%%") }

0.0 L2 regularization parameter, AUC = 66.260311%
0.001 L2 regularization parameter, AUC = 66.260311%
0.0025 L2 regularization parameter, AUC = 66.260311%
0.005 L2 regularization parameter, AUC = 66.238294%
0.01 L2 regularization parameter, AUC = 66.238294%


页: [1]
查看完整版本: Spark机器学习入门4·分类模型(spark-shell)如何实现的