分享

Spark Mllib怎么读取Hive表数据?

kennys 发表于 2017-9-30 16:54:40 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 1 11044
Hive表20个字段 读取前19个字段作为特征 第20个字段是类别,想用Spark Milib的贝叶斯进行分类 不知道Spark Milib怎么读Hive表转换成训练要求的格式,有没有做过类似的 求详细过程 本人对Spark不太熟

已有(1)人评论

跳转到指定楼层
desehawk 发表于 2017-9-30 21:42:03
楼主参考下面内容

MLlib的例子,基于RDD,从ML的vector转换成MLlib的vector的过程

[mw_shl_code=scala,true]import java.io.{ObjectInputStream, ObjectOutputStream}


import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FSDataInputStream, Path, FileSystem}
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.mllib.linalg.{Vector, Vectors}
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.tree.RandomForest
import org.apache.spark.mllib.tree.model.RandomForestModel
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.Row
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.{SparkConf, SparkContext}
//import org.apache.spark.ml.linalg.Vector
import org.apache.spark.mllib.util.MLUtils

var modelRF: RandomForestModel = null

val hc = new HiveContext(sc)
import hc.implicits._
// 广告画像构建完毕

// 取样本,样本的第一列为label(0或者1),其他列可能是姓名,手机号,以及真正要参与训练的特征columns
val data = hc.sql(s"""select * from database1.traindata_userprofile""".stripMargin)
////提取schema,也就是表的column name,drop(2)删掉1,2列,只保留特征列
val schema = data.schema.map(f=>s"${f.name}").drop(1)
//ML的VectorAssembler是一个transformer,要求数据类型不能是string,将多列数据转化为单列的向量列,比如把age、income等等字段列合并成一个 userFea 向量列,方便后续训练
val assembler = new VectorAssembler().setInputCols(schema.toArray).setOutputCol("userFea")
val data2 = data.na.fill(-1e9)
val userProfile = assembler.transform(data2).select("label","userFea")

//重点在这:用ML的VectorAssembler构建的vector,必须要有这个格式的转换,从ML的vector转成 MLlib的vector,才能给MLlib里面的分类器使用(这两种vector还真是个坑,要注意)
val userProfile2 = MLUtils.convertVectorColumnsFromML(userProfile, "userFea")
// 取训练样本
val rdd_Data : RDD[LabeledPoint]= userProfile2.rdd.map {
x => val label = x.getAs[Double]("label")
val userFea = x.getAs[Vector]("userFea")
LabeledPoint(label,userFea)
}
// 构建好了训练数据就可以进行训练了, RF的参数如下
val impurity = "gini"
val featureSubsetStrategy = "auto"
// Let The Algorithm Choose
val categoricalFeaturesInfo = Map[Int, Int]()
val iteration = 50
val maxDepth = 9
val numClasses = 2
val maxBins = 32
val numTrees = 70
modelRF = RandomForest.trainClassifier(rdd_Data, numClasses, categoricalFeaturesInfo,
numTrees, featureSubsetStrategy, impurity, maxDepth, maxBins)
println("training finished!!!!")
// Evaluate model on test instances and compute test error
val labelAndPreds = userProfile2.rdd.map { x=>
val label = x.getAs[Double]("label")
val userFea = x.getAs[Vector]("userFea")
val prediction = modelRF.predict(userFea)
(label, prediction)
}
labelAndPreds.take(10).foreach(println)
modelRF.save(sc, "/home/user/victorhuang/RFCModel_mllib")
spark.stop()[/mw_shl_code]


回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条