分享

Spark机器学习入门实例——大数据集(30+g)二分类

wuyufei 2015-12-7 16:35:00 发表于 入门帮助 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 2 17829
本帖最后由 wuyufei 于 2015-12-7 16:54 编辑

问题导读:
1.输入数据与预期结构是什么?
2.如何进行二进制和多标签分类?
3.独立模式下如何运行和设置Apache Spark?
4.如何导入库?
5.如何准备练习和测试数据集?
6.如何准备训练模型?
7.如何准备测试模型?



本篇教程将引领大家,通过使用spark的机器学习性能和 Scala ,练习一个基于超出内存可加载范围的数据集的逻辑回归分类器(即LR分类器)。

假如你想创建一个机器学习模型,但却发现你的输入数据集与你的计算机内存不相符?对于多机器的计算集群环境中通常可以使用如Hadoop和Apache Spark分布式计算工具。然而,Apache Spark能够在本地机器独立模式上,甚至在当输入数据集大于你的计算机内存时通过创建模型处理你的数据。

     在这篇文章里,通过使用一个34.6千兆字节的输入数据集创建一个二进制分类模型,为您展现一个Apache Spark的端对端脚本。

可以在您的计算机上运行进行测试。


1.输入数据和预期结果


在上一篇文章我们讨论了“How To Find Simple And Interesting Multi-Gigabytes Data Set”,本文将使用上文中提及数据集的Posts.xml文件。文件大小是34.6千兆字节,这个xml文件包含stackoverflow.com文章数据作为xml属性:

  • 标题 – 文章标题
  • 主体 – 文章文本
  • 标签 – 文章的标签列表
  • 10+ 更多的xml -我们不需要使用的属性

关于stackoverflow.com的Posts.xml完整数据集信息请点击:https://archive.org/details/stackexchange.

另外我创建一个较小版本的这种文件,里面只有10个条目或文章。此文件包含一个小尺寸的原始数据集,这个数据是被知识共享许可批准的。

如你所料,这个小文件并不是模型训练的最好的选择(这个小模型训练文件并不是最好的选择),这个文件仅适用于实验数据准备代码。然而,本文中的端对端Spark脚本也适用于这个小文件,文件下载请点击这里。

我们的目标是创建一个可基于主体和标题预测文章标签的预测模型。为了精简任务和减少代码数量,我们将联接标题和主体并作为一个单独的文本列。

可想而知,这个模型在stackoverflow.com网站上是怎样工作的——用户键入一个问题,网站自动给予标签建议。

假设我们需要尽可能多的正确的标签,并且用户将消除不必要的标签。由于这个假设我们将选择撤销作为我们的模型高优先级目标。


2.二进制和多标签分类

栈溢出标记预测问题属于多标签分类的一种但并不唯一,因为模型应当预测许多分类。相同的文本将被归类为“Java”和“多线程”。注意多标签分类是不同的问题的一个泛化 ——多分类问题,从一组类预测为仅仅一个类。为了简化我们的第一个Apache Spark问题以及减少代码数量,让我们开始简化问题吧。取代练习一个多标记分类器,我们来对一个给定的标签练习一个简单的二进制分类器。例如,对标签“Java”,创建一个能够预测关于Java语言文章的分类器。

通过使用这个简单方法,可以创建几乎所有常见标签的分类器(Java, C++, Python, multi-threading等)。这是一个简单而易学的方法。然而,在实践中却并不完美,因为拆分预测模型基于各自的分类器,你忽略了类之间的相关性。另一个原因——训练许多分类器可能需要大量的计算。


3.在独立模式下设置和运行Apache Spark

如果你的机器上没有Apache Spark,从Spark官网上可以很容易下载到: http://spark.apache.org/,请下载使用1.5.1版本。spark-1.5.1版本的下载链接—— http://d3kbcqa49mib13.cloudfront.net/spark-1.5.1-bin-hadoop2.6.tgz

如果Java在你的计算机上已经安装了,那么你可以准备启动Spark了,如果没有的话——请先安装Java。

在Unix系统和Macs系统上,解压缩该文件并复制到任意目录中。现在这个目录就是Spark的目录了。

运行Spark master:

[mw_shl_code=bash,true]sbin/start-master.sh[/mw_shl_code]

运行spark slave:

[mw_shl_code=bash,true]sbin/start-slaves.sh[/mw_shl_code]

运行Spark shell:

[mw_shl_code=bash,true]bin/spark-shell[/mw_shl_code]

Spark shell可以交互模式运行Scala命令。

Windows用户可以在这里找到命令:http://nishutayaltech.blogspot.in/2015/04/how-to-run-apache-spark-on-windows7-in.html

如果你工作在Hadoop环境的集群模式下,我假定你已经知道怎样运行Spark shell了。


4.导入库

对于这个端到端场景我们将使用Scala,也是Apache Spark的主要编程语言。

[mw_shl_code=bash,true]// General purpose library
import scala.xml._

// Spark data manipulation libraries
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql._
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._

// Spark machine learning libraries
import org.apache.spark.ml.feature.{HashingTF, Tokenizer}
import org.apache.spark.ml.classification
    .LogisticRegression
import org.apache.spark.mllib.evaluation
    .BinaryClassificationMetrics
import org.apache.spark.ml.Pipeline[/mw_shl_code]

5.解析XML

我们需要从输入的xml文件中提取主体、文本和标签,并用这些列创建一个单独的data-frame。首先,让我们移除xml的页眉和页脚。我假定输入文件和Spark shell命令位于同一目录下。

[mw_shl_code=bash,true]val fileName = "Posts.small.xml"
val textFile = sc.textFile(fileName)
val postsXml = textFile.map(_.trim).
   filter(!_.startsWith("<?xml version=")).
   filter(_ != "<posts>").
   filter(_ != "</posts>")[/mw_shl_code]

Spark具有良好的函数可用于解析json和csv格式。对于Xml,我们需要编写一些额外的代码行,通过规格化编码模式来创建一个数据框架。

注意,Scala语言自动转换所有xml代码,像“<a>”转换到实际标签“<a>”。我们也将连接标题和主体、移除所有的不必要的标签以及来自主体和所有空间副本的新行字符。

[mw_shl_code=bash,true]val postsRDD = postsXml.map { s =>
   val xml = XML.loadString(s)

   val id = (xml \ "@Id").text
   val tags = (xml \ "@Tags").text

   val title = (xml \ "@Title").text
   val body = (xml \ "@Body").text
   val bodyPlain = ("<\\S+>".r).replaceAllIn(body, " ")
   val text = (title + " " + bodyPlain).replaceAll("\n",
      " ").replaceAll("( )+", " ");

   Row(id, tags, text)
}[/mw_shl_code]

创建一个data-frame,schema应当应用于RDD(弹性分布式数据集,Resilient Distributed Datasets)

[mw_shl_code=bash,true]val schemaString = "Id Tags Text"
val schema = StructType(
   schemaString.split(" ").map(fieldName =>
      StructField(fieldName, StringType, true)))

val postsDf = sqlContext.createDataFrame(postsRDD, schema)[/mw_shl_code]

现在你可以查看一下你的数据框架了。


6.准备练习和测试数据集

下一步——为一个二进制分类器创建一个二进制标签。对这个代码示例,我们将用“java”作为一个标签,我们想要用一个二进制分类器来预测。所有存在“java”标签的行被标记为“1”,否则被标记为“0”。让我们来辨别我们的目标标记“java”和基于这个标记创建二进制标签。

[mw_shl_code=bash,true]val targetTag = "java"
val myudf: (String => Double) = (str: String) =>
    {if (str.contains(targetTag)) 1.0 else 0.0}
val sqlfunc = udf(myudf)
val postsLabeled = postsDf.withColumn("Label",
    sqlfunc(col("Tags")) )[/mw_shl_code]

通过使用新标签,数据集可分为消极的子集和积极的子集。

[mw_shl_code=bash,true]val positive = postsLabeled.filter('Label > 0.0)
val negative = postsLabeled.filter('Label < 1.0)[/mw_shl_code]

我们将使用数据的90%用于模型训练和另外10%作为数据集测试。让我们通过独立抽样积极和消极数据集来创建一个训练数据集吧。

[mw_shl_code=bash,true]val positiveTrain = positive.sample(false, 0.9)
val negativeTrain = negative.sample(false, 0.9)
val training = positiveTrain.unionAll(negativeTrain)[/mw_shl_code]

测试数据集应当包括训练数据集中不包括的所有的行。并且再次地——分别地处理积极样例和消极样例

[mw_shl_code=applescript,true]val negativeTrainTmp = negativeTrain
    .withColumnRenamed("Label", "Flag").select('Id, 'Flag)

val negativeTest = negative.join( negativeTrainTmp,
    negative("Id") === negativeTrainTmp("Id"),
    "LeftOuter").filter("Flag is null")
    .select(negative("Id"), 'Tags, 'Text, 'Label)

val positiveTrainTmp = positiveTrain
    .withColumnRenamed("Label", "Flag")
    .select('Id, 'Flag)

val positiveTest = positive.join( positiveTrainTmp,
    positive("Id") === positiveTrainTmp("Id"),
    "LeftOuter").filter("Flag is null")
    .select(positive("Id"), 'Tags, 'Text, 'Label)

val testing = negativeTest.unionAll(positiveTest)[/mw_shl_code]



7.训练模型

定义训练参数:

1.特征数目

2.回归参数

3.梯度下降的样本点数目

Spark API 创建一个基于来自 data-frame和训练参数的列的模型 :

[mw_shl_code=bash,true]val numFeatures = 64000
val numEpochs = 30
val regParam = 0.02

val tokenizer = new Tokenizer().setInputCol("Text")
    .setOutputCol("Words")

val hashingTF = new  org.apache.spark.ml.feature
    .HashingTF().setNumFeatures(numFeatures).

    setInputCol(tokenizer.getOutputCol)
    .setOutputCol("Features")

val lr = new LogisticRegression().setMaxIter(numEpochs)
    .setRegParam(regParam)setFeaturesCol("Features")
    .setLabelCol("Label").setRawPredictionCol("Score")
    .setPredictionCol("Prediction")

val pipeline = new Pipeline()
    .setStages(Array(tokenizer, hashingTF, lr))

val model = pipeline.fit(training)[/mw_shl_code]

8.测试模型

这是我们最后一段二进制“Java”分类器代码,将返回一个预测值(0.0或1.0):

[mw_shl_code=bash,true]val testTitle =
"Easiest way to merge a release into one JAR file"

val testBoby =
"""Is there a tool or script which easily merges a bunch
of href="http://en.wikipedia.org/wiki/JAR_%28file_format
%29" JAR files into one JAR file? A bonus would be to
easily set the main-file manifest and make it executable.
I would like to run it with something like: As far as I
can tell, it has no dependencies which indicates that it
shouldn't be an easy single-file tool, but the downloaded
ZIP file contains a lot of libraries."""

val testText = testTitle + testBody

val testDF = sqlContext
   .createDataFrame(Seq( (99.0, testText)))
   .toDF("Label", "Text")

val result = model.transform(testDF)

val prediction = result.collect()(0)(6)
   .asInstanceOf[Double]

print("Prediction: "+ prediction)[/mw_shl_code]

让我们评估基于训练数据集的模型的质量。

[mw_shl_code=bash,true]val testingResult = model.transform(testing)

val testingResultScores = testingResult
   .select("Prediction", "Label").rdd
   .map(r => (r(0).asInstanceOf[Double], r(1)
   .asInstanceOf[Double]))

val bc =
   new BinaryClassificationMetrics(testingResultScores)

val roc = bc.areaUnderROC

print("Area under the ROC:" + roc)[/mw_shl_code]

如果你使用的是小规模的数据集,那么你的模型质量可能不是最好的。ROC值下的面积将非常低(接近50%),意味着一个差的模型质量。如果使用的是整个的Post.xml数据集,质量就不会这么差,ROC下的面积是0.64。或许你可以通过考虑不同的转换,如TF-IDF和规范化来改善结果,但本文中不再详述。

原文作者:Dmitry Petrov  由36大数据翻译组-元曜+BI+刘亭亭翻译,本文链接http://www.36dsj.com/archives/37699



已有(2)人评论

跳转到指定楼层
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条