分享

ML Pipelines:Spark 1.2中一个用于MLlib的High-Level API

本帖最后由 pig2 于 2015-1-30 20:55 编辑
问题导读
1、什么是High-Level API?

2、如何查看更多SchemaRDD特性?
3、如何理解ML算法?





摘要:
在Spark 1.2中,通过与AMPLab(UC Berkeley)合作,一个 pipeline API被添加到MLlib,再次简化了MLlib的建立工作,并添加了针对ML pipelines的调优机制。
在每次版本更新中,除下新算法和性能升级,Databricks在MLlib的易用性上同样投入了大量精力。类似Spark Core,MLlib提供了3个编程语言的API:Python、Java和Scala。除此之外,MLlib同样提供了代码示例,以方便不同背景用户的学习和使用。在Spark 1.2中,通过与AMPLab(UC Berkeley)合作,一个 pipeline API被添加到MLlib,再次简化了MLlib的建立工作,并添加了针对ML pipelines的调优机制。

实际应用中,一个ML pipeline往往包括一系列的阶段,比如数据预处理、特征提取、模型拟合及可视化。举个例子,文本分类可能就会包含文本分割与清洗、特征提取,并通过交叉验证训练一个分类模型。虽然当下每个步骤都有许多库可以使用,但是将每个步骤连接起来却并不是件容易的事情,特别是在大规模场景下。当下,大部分的库都不支持分布式计算,或者他们并不支持一个原生的pipeline建立和优化。不幸的是,这个问题经常被学术界所忽视,而在工业界却又不得不重点讨论。

本篇博文将简述Databricks和AMPLab在ML pipeline(MLlib)所做的工作,其中有些设计由scikit-learn项目和一些前期MLI工作启发而来。

Dataset Abstraction

在新的pipeline设计时,数据集通常由Spark SQL的SchemaRDD以及ML pipeline的一系列数据集转换表现。每个转换都会摄入一个输入数据集,并输出一个已转换数据集,同时输出数据集将成为下一个步骤的输入数据集。之所以使用Spark SQL,主要考虑到以下几个因素:数据导入/输出、灵活的列类型和操作,以及执行计划优化。

数据的输入和输出是一个ML pipeline的起点和终点。MLlib当下已为数种类型提供了实用的输入和输出工具,其中包括用于分类和回归的LabeledPoint、用于协同过滤的Rating等。然而真实的数据集可能会包含多种类型,比如用户/物品 ID、时间戳,亦或是原始记录,而当下的工具并没有很好地支持所有这些类型。同时,它们还使用了从其他ML库中继承的无效率文本存储格式。

通常主流的ML pipeline都会包含特征转换阶段,你可以把特征转换看成在现有列上加上一个新列。举个例子,比如:文本分词将文档拆成大量词,而tf-idf则将这些词转换为一个特征向量。在这个过程中,标签会被加工用于模型拟合。同时,在实际过程中,更复杂的特征转换也经常会出现。因此,数据集需要支撑不同类型的列,包括密集/稀疏向量,以及为现有列建立新列的操作。
54bf3995da309_middle.png


在上面这个例子中,id、text以及words在转换中都会被转入。在模型拟合中,它们是不需要的,但是在预测和模型校验时它们又会被用到。如果预测数据集只包含predicted labels,那么它们不会提供太多的信息。如果我们希望检验预测结果,比如检验false positives,那么结合predicted labels 、原始输入文本及tokenized words则是非常有必要的。这样一来,如果底层执行引擎经过优化,并且只加载所需列将是很必要的。

幸运的是,Spark SQL已经提供了大多数所期望的功能,机构不需要再重新开始。Spark支持从Parque读取SchemaRDDs,并支持将SchemaRDDs写入对应的Parque。Parque是一个非常有效的列存储格式,可以在RDDs和SchemaRDDs之间自由转换,它同时还支持Hive和Avro这样的外部数据源。使用Spark SQL,建立(说声明可能更为准确)新列将非常便捷和友好。SchemaRDD实体化使用了lazy模式,Spark SQL可以基于列的需求来优化执行计划,可以较好的满足用户需求。SchemaRDD支持标准的数据类型,为了让其可以更好地支持ML,技术团队为其添加了对向量类型的支持(用户定义类型),同时支持密集和稀疏特征向量。

下面是一段Scala代码,它实现了ML数据集导入/输出,以及一些简单的功能。在Spark知识库“examples/”目录下,你发现一些更加复杂的数据集示例(使用Scala和Python)。在这里,我们推荐用户阅读Spark SQL’s user guide以查看更多SchemaRDD详情,以及它所支撑的操作。
  1. val sqlContext = SQLContext(sc)
  2. import sqlContext._ // implicit conversions
  3. // Load a LIBSVM file into an RDD[LabeledPoint].
  4. val labeledPointRDD: RDD[LabeledPoint] =
  5.   MLUtils.loadLibSVMFile("/path/to/libsvm")
  6. // Save it as a Parquet file with implicit conversion
  7. // from RDD[LabeledPoint] to SchemaRDD.
  8. labeledPointRDD.saveAsParquetFile("/path/to/parquet")
  9. // Load the parquet file back into a SchemaRDD.
  10. val dataset = parquetFile("/path/to/parquet")
  11. // Collect the feature vectors and print them.
  12. dataset.select('features).collect().foreach(println)
复制代码


Pipeline

新的Pipeline API位于名为“spark.ml”的包下。Pipeline由多个步骤组成, 这些步骤一般可分为两个类型: Transformer和Estimator。Transformer会摄入一个数据集,并输出一个新的数据集。比如,分词组件就是一个Transformer,它会将一个文本数据集转换成一个tokenized words数据集。Estimator首先必须满足输入数据集,并根据输入数据集产生一个模型。举个例子,逻辑归回就是一个Estimator,它会在一个拥有标签和特征的数据集上进行训练,并返回一个逻辑回归模型。

Pipeline建立起来比较简单:简单的声明它的步骤,配置参数,并将在一个pipeline object中进行封装。下面的代码演示了一个简单文本分类pipeline,由1个分词组件、1个哈希Term Frequency特征抽取组件,以及1个逻辑回归。
  1. val tokenizer = new Tokenizer()
  2.   .setInputCol("text")
  3.   .setOutputCol("words")
  4. val hashingTF = new HashingTF()
  5.   .setNumFeatures(1000)
  6.   .setInputCol(tokenizer.getOutputCol)
  7.   .setOutputCol("features")
  8. val lr = new LogisticRegression()
  9.   .setMaxIter(10)
  10.   .setRegParam(0.01)
  11. val pipeline = new Pipeline()
  12.   .setStages(Array(tokenizer, hashingTF, lr))
复制代码


Pipeline的本身就是个Estimator,因此我们可以轻松的使用。
  1. val model = pipeline.fit(trainingDataset)
复制代码


拟合模型包括了分词组件、哈希TF特征抽取组件,以及拟合逻辑回归模型。下面的图表绘制了整个工作流,虚线部分只在pipeline fitting中发生。
121.png


这个拟合Pipeline模型是个Transformer,可以被用于预测、模型验证和模型检验。
  1. model.transform(testDataset)
  2.   .select('text, 'label, 'prediction)
  3.   .collect()
  4.   .foreach(println)
复制代码


在 ML算法上,有一个麻烦的事情就是它们有许多hyperparameters需要被调整。同时,这些hyperparameters与被MLlib优化的模型参数完全不同。当然,如果缺乏数据和算法上的专业知识,我们很难发现这些hyperparameters组合的最优组合。然而,即使有专业知识,随着pipeline和hyperparameters规模的增大,这个过程也将变得异常复杂。而在实践中,hyperparameters的调整却通常与最终结果戚戚相关。举个例子,在下面的pipeline中,我们有两个hyperparameters需要调优,我们分别赋予了3个不同的值。因此,最终可能会产生9个不同的组合,我们期望从中找到一个最优组合。
11.png


在这里,spark支持hyperparameter的交叉验证。交叉验证被作为一个元方法,通过用户指定参数组合让其适合底层Estimator。这里的Estimator可以是一个pipeline,它可以与 Evaluator组队并输出一个标量度量用于预测,比如精度。调优一个Pipeline是非常容易的:
  1. // Build a parameter grid.
  2. val paramGrid = new ParamGridBuilder()
  3.   .addGrid(hashingTF.numFeatures, Array(10, 20, 40))
  4.   .addGrid(lr.regParam, Array(0.01, 0.1, 1.0))
  5.   .build()
  6. // Set up cross-validation.
  7. val cv = new CrossValidator()
  8.   .setNumFolds(3)
  9.   .setEstimator(pipeline)
  10.   .setEstimatorParamMaps(paramGrid)
  11.   .setEvaluator(new BinaryClassificationEvaluator)
  12. // Fit a model with cross-validation.
  13. val cvModel = cv.fit(trainingDataset)
复制代码


当然,在一个ML pipeline中,用户可以嵌入自己的transformers或者estimators是非常重要的(建立在用户已经实现了pipeline接口的情况下)。这个API让MLlib外部代码的使用和共享变得容易,我们建议用户去阅读spark.ml user guide以获得关于pipeline API的更多信息。

结语

本篇博文介绍了Spark 1.2中引入的ML pipeline API,以及这个API的运行原理——需要由 SPARK-3530、SPARK-3569、 SPARK-3572、SPARK-4192和SPARK-4209多个JIRAs完成。我们建议用户阅读JIRA页面上公布的设计文档以获得更多消息和设计选择。需要提及的是,Pipeline API的开发并没有全部完成。同时,在Pipeline API之外,还有一些相关的工作需要完成,比如:SPARK-5097,需要添加一个data frame APIs到SchemaRDD;SPARK-4586,需要一个ML pipeline Python API;SPARK-3702,用于学习算法和模型的类层次结构。



###########################################
本文转载自:https://databricks.com/blog/2015 ... -api-for-mllib.html


没找到任何评论,期待你打破沉寂

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条