xzc0202 发表于 2017-10-30 00:52:43

Spark机器学习库(MLlib)之3:管道

本帖最后由 xzc0202 于 2017-10-30 13:39 编辑

问题向导:
(1)MLlib中的管道是什么?
(2)管道组件有哪些?
(3)管道具体工作原理?

static/image/hrline/4.gif

上一篇Spark机器学习库(MLlib)之2:基础统计
http://www.aboutyun.com/forum.php?mod=viewthread&tid=22722

1.管道
本节将介绍机器学习管道的概念。机器学习管道提供一套高级的API,帮助用户创建、调试机器学习工作流。

1.1.管道主要概念
MLlib标准化API用于将多种机器学习算法组合到一个管道或者工作流中。本节将通管道API介绍管道主要概念,管道的概念主要源于scikit-learn项目

[*]数据框(DataFrame):机器学习API使用Spark SQL的DataFrame作为机器学习的数据集,它可以处理多种数据类型,如:一个DataFrame可以有不同的列存储文本、特征向量、标签值和预测值.

[*]转换器(Transformer):转换器是将一个DataFrame为另一个DataFrame的算法,如:机器学习模型就是一个把特征值DataFrame转为预测值DataFrame的转换器.

[*]估计器(Estimator):估计器是通过拟合一个DataFrame生成转换器(Transformer)的算法。如:机器学习算法就是一个通过训练DataFrame生成预测模型的估计器.

[*]管道(Pipeline):管道就是串连多个转换器或者估计器,一起构成机器学习(ML)工作流。

[*]参数(Parameter):所有转换器和估计器用于指定参数的通用API.

1.2.数据框(DataFrame)
机器学习需要使用不同类型的数据,如向量、文本、图像及结构化数据,使用来自于Spark SQL的DataFrame能够满足数据类型的多样性。
DataFrame支持多种基础数据和结构数据;Spark SQL支持数据类型详见:the Spark SQL datatype reference.除spark sql中数据类型外,DataFrame还支持机器学习向量类型.
DataFrame可以通过RDD显示或者隐式的方式来建立,代码样例详见:Spark SQL programming guide
DataFrame的列需要命名,代码示例中使用如"text","features","label"等名称.

1.3.管道组件
1.3.1.转换器(Transformers)
转换器是一个包含特征转换和学习模型的抽像概念,技术上,转换器通过transform()方法,把一个DataFrame通过增加1到多列转为另一个DataFrame,如:

[*]一个特征转换器输入一个DataFrame,读取一个文本列,将其映射为新的特征向量列。输出一个新的带有特征向量列的DataFrame。
[*]一个学习模型转换器输入一个DataFrame,读取包括特征向量的列,预测每一个特征向量的标签。输出一个新的带有预测标签列的DataFrame
1.3.2.估计器(Estimator)
估计器是用来适配与训练数据的一个或多个机器学习算法的抽象概念,从技术上讲,一个估计器实现了方法fit(),方法用来接收一个DataFrame产生一个模型,产生的模型是一个转换器,比如,一个机器学习算法如逻辑回归(LogisticRegression )是一个估计器,通过调用fit()训练生成一个逻辑回归模型,它是一个模型也是一个转换器.
1.3.3.管道组件属性
Transformer.transform()和Estimator.fit()都是无状态的,将来,带状态的算法可能被支持.
转换器与估计器的每个实例有唯一的ID,在具体的参数中非常有用(后面具体讨论)

1.4.管道
机器学习中,运行一系列的算法来处理与学习数据是很常见的,比如,一个简单的文本文档处理流可能包含下列阶段:

[*]分割文档文本为单词集合
[*]转换每一个文档单词为数字向量特征
[*]用特征向量与标签学习一个预测模型
MLlib中称这种工作流为管道,它包含一系列的按指定顺序运行的管道阶段(转换器与估计器)。本节我们将使用上面的简单工作流做为运行样例.

1.4.1.工作原理
一个管道指定一系列的阶段,每个阶段是一个转换器或者估计器,这些阶段按顺序执行,输入的DataFrame在每个阶段中传递并进行相应的转换。在转换器(Transformer)阶段时,transform()方法会被调用用来转换输入的DataFrame。在估计器(Estimator)阶段时,fit()方法被调用用来产生一个转换器(Transformer)(这个转换器Transformer将成为PiplineModel的一部分,或者应用到Pipline),接着这个转换器的transform()方法将被调用并转换输入的DataFrame。
我们举一个简单的文本文档工作流进行说明。下图是一个作为训练过程的Pipline的用法:



在上图中,上面一行代表Pipline的三个stage阶段。前面两个(Tokenizer和HashingTF)是转换器(Transformer)(蓝色),第三个(LogisticRegression)是一个估计器(Estimator)(红色)。下面一行代表pipline的数据流,圆柱代表DataFrame。调用Pipline.fit()方法处理最初的DataFrame,这个最初的DataFrame有一行行的文本文档和标签。Tokenizer.transform()方法将一行行的文本文档切分成词,向DataFrame添加了一个以词为列的新的列。HashingTF.transform()方法将词列转换为特征向量,向DataFrame添加了一个以这些向量为列的新列。现在,因为LogisticRegression是一个Estimator,Pipline首先调用LogisticRegression.fit()来产生一个LogisticRegressionModel。如果这个Pipline有更多的阶段,它将在把这个DataFrame传给下一个阶段之前调用LogisticRegressionModel的transform()方法。

一个管道其实就相当于一个估计器(Estimator),因此管道的fit()该当执行后,就生成一个转换器的管道模型(PipelineModel).这个管道模型通常用于测试数据.下图将说明具体用法:

在上图中,管道模型(PipelineModel)和原始管道有相同数量的阶段,但是在原始管道中的估计器(Estimator)此时已换成了转换器(Transformers)。当管道模型(PipelineModel)的transform()方法被调用于测试数据集时,这些数据是按照顺序通过管道。每个阶段的transform方法更新数据并将其传递到下一个阶段。

在管道(Pipline)和管道模型(PiplineModel)的帮忙下,有助于使训练数据和测试数据都经过同样的特征处理步骤。

1.4.2.细节

[*]DAG管道:管道的具体阶段(stages)由一个有序的数组来定义。这里给出的示例都是线性管道,也就是说管道的每个阶段使用的数据都是由上一个阶段产生。我们也可以产生非线性的管道,但需要数据流向为无向无环图(DAG)时。这种图通常需要明确地指定每个阶段的输入和输出列名(通常以指定参数的形式)。如果管道是DAG形式,则每个阶段必须以拓扑序的形式指定。
[*]运行时检查:因为管道可以运行在多种数据类型上,所以不能使用编译时检查。管道和管道模型在实际运行管道之前就会进行运行时检查。这种检查通过DataFrame schema,它描述了数据框中各列的类型。
[*]管道阶段唯一:管道的每个阶段需要是唯一的实体。如同样的实体“myHashingTF”不可以进入管道两次,因为管道的每个阶段必须有唯一的ID。当然“myHashingTF1”和“myHashingTF2”(都是"HashingTF"的实例)可以进入同个管道两次,因为他们有不同的ID

1.4.3.参数
MLlib估计器和转换器使用统一的接口来指定参数。
Param是有完备文档的已命名参数。ParamMap是一些列“参数-值”对。
有两种主要的方法来向算法传递参数:

[*]给实体设置参数。比如,lr是一个逻辑回归实体,通过lr.setMaxIter(10)来使得lr在拟合的时候最多迭代10次。这个接口与spark.mllib包相似。
[*]传递ParamMap到fit()或者transform()。所有在ParamMap里的参数都将通过设置被重写。
参数属于指定估计器和转换器实体过程。因此,如果我们有两个逻辑回归实体lr1和lr2,我们可以建立一个ParamMap来指定两个实体的最大迭代次数参数:ParamMap(lr1.maxIter -> 10, lr2.maxIter -> 20)。这在一个管道里有两个算法都有最大迭代次数参数时非常有用。

1.4.4.存储和读取管道
我们经常需要将管道存储到磁盘以供下次使用。从Spark1.6开始,模型导入/导出功能新添了管道接口,支持大多数转换器。具体请查阅算法接口文档是否支持存储和读入。

1.5.样例代码
下面给出上述讨论功能的代码示例,更多信息请查看API文档(Scala, Java, and Python):
1.5.1.样例:估计器、转换器和参数
本样例覆盖估计器、转换器和参数的内容
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.linalg.{Vector, Vectors}
import org.apache.spark.ml.param.ParamMap
import org.apache.spark.sql.Row

// Prepare training data from a list of (label, features) tuples.
val training = spark.createDataFrame(Seq(
(1.0, Vectors.dense(0.0, 1.1, 0.1)),
(0.0, Vectors.dense(2.0, 1.0, -1.0)),
(0.0, Vectors.dense(2.0, 1.3, 1.0)),
(1.0, Vectors.dense(0.0, 1.2, -0.5))
)).toDF("label", "features")

// Create a LogisticRegression instance. This instance is an Estimator.
val lr = new LogisticRegression()
// Print out the parameters, documentation, and any default values.
println("LogisticRegression parameters:\n" + lr.explainParams() + "\n")

// We may set parameters using setter methods.
lr.setMaxIter(10)
.setRegParam(0.01)

// Learn a LogisticRegression model. This uses the parameters stored in lr.
val model1 = lr.fit(training)
// Since model1 is a Model (i.e., a Transformer produced by an Estimator),
// we can view the parameters it used during fit().
// This prints the parameter (name: value) pairs, where names are unique IDs for this
// LogisticRegression instance.
println("Model 1 was fit using parameters: " + model1.parent.extractParamMap)

// We may alternatively specify parameters using a ParamMap,
// which supports several methods for specifying parameters.
val paramMap = ParamMap(lr.maxIter -> 20)
.put(lr.maxIter, 30)// Specify 1 Param. This overwrites the original maxIter.
.put(lr.regParam -> 0.1, lr.threshold -> 0.55)// Specify multiple Params.

// One can also combine ParamMaps.
val paramMap2 = ParamMap(lr.probabilityCol -> "myProbability")// Change output column name.
val paramMapCombined = paramMap ++ paramMap2

// Now learn a new model using the paramMapCombined parameters.
// paramMapCombined overrides all parameters set earlier via lr.set* methods.
val model2 = lr.fit(training, paramMapCombined)
println("Model 2 was fit using parameters: " + model2.parent.extractParamMap)

// Prepare test data.
val test = spark.createDataFrame(Seq(
(1.0, Vectors.dense(-1.0, 1.5, 1.3)),
(0.0, Vectors.dense(3.0, 2.0, -0.1)),
(1.0, Vectors.dense(0.0, 2.2, -1.5))
)).toDF("label", "features")

// Make predictions on test data using the Transformer.transform() method.
// LogisticRegression.transform will only use the 'features' column.
// Note that model2.transform() outputs a 'myProbability' column instead of the usual
// 'probability' column since we renamed the lr.probabilityCol parameter previously.
model2.transform(test)
.select("features", "label", "myProbability", "prediction")
.collect()
.foreach { case Row(features: Vector, label: Double, prob: Vector, prediction: Double) =>
    println(s"($features, $label) -> prob=$prob, prediction=$prediction")
}
完整代码路径: "examples/src/main/scala/org/apache/spark/examples/ml/EstimatorTransformerParamExample.scala"
import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.classification.LogisticRegression;
import org.apache.spark.ml.classification.LogisticRegressionModel;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.ml.param.ParamMap;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

// Prepare training data.
List<Row> dataTraining = Arrays.asList(
    RowFactory.create(1.0, Vectors.dense(0.0, 1.1, 0.1)),
    RowFactory.create(0.0, Vectors.dense(2.0, 1.0, -1.0)),
    RowFactory.create(0.0, Vectors.dense(2.0, 1.3, 1.0)),
    RowFactory.create(1.0, Vectors.dense(0.0, 1.2, -0.5))
);
StructType schema = new StructType(new StructField[]{
    new StructField("label", DataTypes.DoubleType, false, Metadata.empty()),
    new StructField("features", new VectorUDT(), false, Metadata.empty())
});
Dataset<Row> training = spark.createDataFrame(dataTraining, schema);

// Create a LogisticRegression instance. This instance is an Estimator.
LogisticRegression lr = new LogisticRegression();
// Print out the parameters, documentation, and any default values.
System.out.println("LogisticRegression parameters:\n" + lr.explainParams() + "\n");

// We may set parameters using setter methods.
lr.setMaxIter(10).setRegParam(0.01);

// Learn a LogisticRegression model. This uses the parameters stored in lr.
LogisticRegressionModel model1 = lr.fit(training);
// Since model1 is a Model (i.e., a Transformer produced by an Estimator),
// we can view the parameters it used during fit().
// This prints the parameter (name: value) pairs, where names are unique IDs for this
// LogisticRegression instance.
System.out.println("Model 1 was fit using parameters: " + model1.parent().extractParamMap());

// We may alternatively specify parameters using a ParamMap.
ParamMap paramMap = new ParamMap()
.put(lr.maxIter().w(20))// Specify 1 Param.
.put(lr.maxIter(), 30)// This overwrites the original maxIter.
.put(lr.regParam().w(0.1), lr.threshold().w(0.55));// Specify multiple Params.

// One can also combine ParamMaps.
ParamMap paramMap2 = new ParamMap()
.put(lr.probabilityCol().w("myProbability"));// Change output column name
ParamMap paramMapCombined = paramMap.$plus$plus(paramMap2);

// Now learn a new model using the paramMapCombined parameters.
// paramMapCombined overrides all parameters set earlier via lr.set* methods.
LogisticRegressionModel model2 = lr.fit(training, paramMapCombined);
System.out.println("Model 2 was fit using parameters: " + model2.parent().extractParamMap());

// Prepare test documents.
List<Row> dataTest = Arrays.asList(
    RowFactory.create(1.0, Vectors.dense(-1.0, 1.5, 1.3)),
    RowFactory.create(0.0, Vectors.dense(3.0, 2.0, -0.1)),
    RowFactory.create(1.0, Vectors.dense(0.0, 2.2, -1.5))
);
Dataset<Row> test = spark.createDataFrame(dataTest, schema);

// Make predictions on test documents using the Transformer.transform() method.
// LogisticRegression.transform will only use the 'features' column.
// Note that model2.transform() outputs a 'myProbability' column instead of the usual
// 'probability' column since we renamed the lr.probabilityCol parameter previously.
Dataset<Row> results = model2.transform(test);
Dataset<Row> rows = results.select("features", "label", "myProbability", "prediction");
for (Row r: rows.collectAsList()) {
System.out.println("(" + r.get(0) + ", " + r.get(1) + ") -> prob=" + r.get(2)
    + ", prediction=" + r.get(3));
}
from pyspark.ml.linalg import Vectors
from pyspark.ml.classification import LogisticRegression

# Prepare training data from a list of (label, features) tuples.
training = spark.createDataFrame([
    (1.0, Vectors.dense()),
    (0.0, Vectors.dense()),
    (0.0, Vectors.dense()),
    (1.0, Vectors.dense())], ["label", "features"])

# Create a LogisticRegression instance. This instance is an Estimator.
lr = LogisticRegression(maxIter=10, regParam=0.01)
# Print out the parameters, documentation, and any default values.
print("LogisticRegression parameters:\n" + lr.explainParams() + "\n")

# Learn a LogisticRegression model. This uses the parameters stored in lr.
model1 = lr.fit(training)

# Since model1 is a Model (i.e., a transformer produced by an Estimator),
# we can view the parameters it used during fit().
# This prints the parameter (name: value) pairs, where names are unique IDs for this
# LogisticRegression instance.
print("Model 1 was fit using parameters: ")
print(model1.extractParamMap())

# We may alternatively specify parameters using a Python dictionary as a paramMap
paramMap = {lr.maxIter: 20}
paramMap = 30# Specify 1 Param, overwriting the original maxIter.
paramMap.update({lr.regParam: 0.1, lr.threshold: 0.55})# Specify multiple Params.

# You can combine paramMaps, which are python dictionaries.
paramMap2 = {lr.probabilityCol: "myProbability"}# Change output column name
paramMapCombined = paramMap.copy()
paramMapCombined.update(paramMap2)

# Now learn a new model using the paramMapCombined parameters.
# paramMapCombined overrides all parameters set earlier via lr.set* methods.
model2 = lr.fit(training, paramMapCombined)
print("Model 2 was fit using parameters: ")
print(model2.extractParamMap())

# Prepare test data
test = spark.createDataFrame([
    (1.0, Vectors.dense([-1.0, 1.5, 1.3])),
    (0.0, Vectors.dense()),
    (1.0, Vectors.dense())], ["label", "features"])

# Make predictions on test data using the Transformer.transform() method.
# LogisticRegression.transform will only use the 'features' column.
# Note that model2.transform() outputs a "myProbability" column instead of the usual
# 'probability' column since we renamed the lr.probabilityCol parameter previously.
prediction = model2.transform(test)
result = prediction.select("features", "label", "myProbability", "prediction") \
    .collect()

for row in result:
    print("features=%s, label=%s -> prob=%s, prediction=%s"
          % (row.features, row.label, row.myProbability, row.prediction))

1.5.2.样例:管道
本样例具体实现了上面图例中“简单文本文档”管道
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.feature.{HashingTF, Tokenizer}
import org.apache.spark.ml.linalg.Vector
import org.apache.spark.sql.Row

// Prepare training documents from a list of (id, text, label) tuples.
val training = spark.createDataFrame(Seq(
(0L, "a b c d e spark", 1.0),
(1L, "b d", 0.0),
(2L, "spark f g h", 1.0),
(3L, "hadoop mapreduce", 0.0)
)).toDF("id", "text", "label")

// Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr.
val tokenizer = new Tokenizer()
.setInputCol("text")
.setOutputCol("words")
val hashingTF = new HashingTF()
.setNumFeatures(1000)
.setInputCol(tokenizer.getOutputCol)
.setOutputCol("features")
val lr = new LogisticRegression()
.setMaxIter(10)
.setRegParam(0.001)
val pipeline = new Pipeline()
.setStages(Array(tokenizer, hashingTF, lr))

// Fit the pipeline to training documents.
val model = pipeline.fit(training)

// Now we can optionally save the fitted pipeline to disk
model.write.overwrite().save("/tmp/spark-logistic-regression-model")

// We can also save this unfit pipeline to disk
pipeline.write.overwrite().save("/tmp/unfit-lr-model")

// And load it back in during production
val sameModel = PipelineModel.load("/tmp/spark-logistic-regression-model")

// Prepare test documents, which are unlabeled (id, text) tuples.
val test = spark.createDataFrame(Seq(
(4L, "spark i j k"),
(5L, "l m n"),
(6L, "spark hadoop spark"),
(7L, "apache hadoop")
)).toDF("id", "text")

// Make predictions on test documents.
model.transform(test)
.select("id", "text", "probability", "prediction")
.collect()
.foreach { case Row(id: Long, text: String, prob: Vector, prediction: Double) =>
    println(s"($id, $text) --> prob=$prob, prediction=$prediction")
}
完整代码路径: "examples/src/main/scala/org/apache/spark/examples/ml/PipelineExample.scala"
import java.util.Arrays;

import org.apache.spark.ml.Pipeline;
import org.apache.spark.ml.PipelineModel;
import org.apache.spark.ml.PipelineStage;
import org.apache.spark.ml.classification.LogisticRegression;
import org.apache.spark.ml.feature.HashingTF;
import org.apache.spark.ml.feature.Tokenizer;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

// Prepare training documents, which are labeled.
Dataset<Row> training = spark.createDataFrame(Arrays.asList(
new JavaLabeledDocument(0L, "a b c d e spark", 1.0),
new JavaLabeledDocument(1L, "b d", 0.0),
new JavaLabeledDocument(2L, "spark f g h", 1.0),
new JavaLabeledDocument(3L, "hadoop mapreduce", 0.0)
), JavaLabeledDocument.class);

// Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr.
Tokenizer tokenizer = new Tokenizer()
.setInputCol("text")
.setOutputCol("words");
HashingTF hashingTF = new HashingTF()
.setNumFeatures(1000)
.setInputCol(tokenizer.getOutputCol())
.setOutputCol("features");
LogisticRegression lr = new LogisticRegression()
.setMaxIter(10)
.setRegParam(0.001);
Pipeline pipeline = new Pipeline()
.setStages(new PipelineStage[] {tokenizer, hashingTF, lr});

// Fit the pipeline to training documents.
PipelineModel model = pipeline.fit(training);

// Prepare test documents, which are unlabeled.
Dataset<Row> test = spark.createDataFrame(Arrays.asList(
new JavaDocument(4L, "spark i j k"),
new JavaDocument(5L, "l m n"),
new JavaDocument(6L, "spark hadoop spark"),
new JavaDocument(7L, "apache hadoop")
), JavaDocument.class);

// Make predictions on test documents.
Dataset<Row> predictions = model.transform(test);
for (Row r : predictions.select("id", "text", "probability", "prediction").collectAsList()) {
System.out.println("(" + r.get(0) + ", " + r.get(1) + ") --> prob=" + r.get(2)
    + ", prediction=" + r.get(3));
}
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer

# Prepare training documents from a list of (id, text, label) tuples.
training = spark.createDataFrame([
    (0, "a b c d e spark", 1.0),
    (1, "b d", 0.0),
    (2, "spark f g h", 1.0),
    (3, "hadoop mapreduce", 0.0)
], ["id", "text", "label"])

# Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr.
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=10, regParam=0.001)
pipeline = Pipeline(stages=)

# Fit the pipeline to training documents.
model = pipeline.fit(training)

# Prepare test documents, which are unlabeled (id, text) tuples.
test = spark.createDataFrame([
    (4, "spark i j k"),
    (5, "l m n"),
    (6, "spark hadoop spark"),
    (7, "apache hadoop")
], ["id", "text"])

# Make predictions on test documents and print columns of interest.
prediction = model.transform(test)
selected = prediction.select("id", "text", "probability", "prediction")
for row in selected.collect():
    rid, text, prob, prediction = row
    print("(%d, %s) --> prob=%s, prediction=%f" % (rid, text, str(prob), prediction))

1.6.模型选择(超参数调优)
一个使用ML Pipline非常大的好处就是超参数优化。查阅ML Tuning Guide获取更多的关于模型自动选择的信息。

英文链接:http://spark.apache.org/docs/latest/ml-pipeline.html


[*]tips:

[*]转换器有点类似于数据加工,估计器其实就是建模,得到模型后调用transform()方法用来预测数据.
[*]管道就是把多个(转换器和估计器)串联起来变成一个新的估计器,调用这个估计器的fit()方法就会建模得到一个叫做PipelineModel的模型。有PipelineModel模型就可以预测数据了,只不过预测数据时是按照管道定义的多个(转换器和估计器[被换成相应的模型])串联步骤执行的。



美丽天空 发表于 2017-10-31 13:05:49

感谢分享
页: [1]
查看完整版本: Spark机器学习库(MLlib)之3:管道