问题导读:
1.什么是MLlib?
2.RDD和MR的区别?
MLlib支持本地向量和单机存储的矩阵,或者以一个或多个RDD支持的分布式矩阵。本地向量和矩阵是简单的数据模型,作为公共接口。以下的线性代数操作由Breeze提供。
本地向量
A本地向量存储在单机上,下标从0开始且为整型,值为复数类型。MLlib支持两种类型的本地向量:浓密和稀疏。浓密向量用复数数组作为入口值,稀疏矩阵用两个平行的数组:下标和值。例如,向量 (1.0, 0.0, 3.0)可以表示为浓密格式 [1.0, 0.0, 3.0]或稀疏格式 (3, [0, 2], [1.0, 3.0]),其中的3表示向量尺寸。
本地向量的基类是 Vector,我们提供两种实现方式: DenseVector 和 SparseVector。推荐使用Vectors 实现的工厂方法来新建本地向量。
参考Vector的Scala 文档和Vectors的Scala文档了解API细节。
[mw_shl_code=applescript,true]
import org.apache.spark.mllib.linalg.{Vector, Vectors}
// Create a dense vector (1.0, 0.0, 3.0).
val dv: Vector = Vectors.dense(1.0, 0.0, 3.0)
// Create a sparse vector (1.0, 0.0, 3.0) by specifying its indices and values corresponding to nonzero entries.
val sv1: Vector = Vectors.sparse(3, Array(0, 2), Array(1.0, 3.0))
// Create a sparse vector (1.0, 0.0, 3.0) by specifying its nonzero entries.
val sv2: Vector = Vectors.sparse(3, Seq((0, 1.0), (2, 3.0)))[/mw_shl_code]
注意: Scala 默认引入 scala.collection.immutable.Vector ,所以要引入 org.apache.spark.mllib.linalg.Vector来显式使用MLlib的向量。
标记的点
一个标记的点是一个本地向量,或者浓密或者稀疏,与标签/反馈相联系。在MLlib, 标记的点用于监督式学习算法。我们使用复数来存储标签,所以可以在回归和分类中使用标记的点。对于二元分类,标签只能是0(负数)或1(正数)。对于多元分类,标签应该是元的编号,且编号从0开始,如 0, 1, 2, ....
一个标记的点用内部类LabeledPoint表示。
参考LabeledPoint Scala文档了解API细节。
[mw_shl_code=applescript,true]mport org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint
// Create a labeled point with a positive label and a dense feature vector.
val pos = LabeledPoint(1.0, Vectors.dense(1.0, 0.0, 3.0))
// Create a labeled point with a negative label and a sparse feature vector.
val neg = LabeledPoint(0.0, Vectors.sparse(3, Array(0, 2), Array(1.0, 3.0)))
Sparse data[/mw_shl_code]
实际情况下,训练数据是稀疏的情况很常见。MLlib支持读取以LIBSVM格式存储的训练样本,样本默认格式是LIBSVM 和LIBLINEAR。该格式是文本格式,其中的每行代表打表的稀疏特征向量,格式如下:
[mw_shl_code=applescript,true]label index1:value1 index2:value2 ...[/mw_shl_code]
其中,下标基于1且是递增顺序。加载完毕后,特征下标被转化为基于0。
MLUtils.loadLibSVMFile 读取以LIBSVM格式存储的样本数据。
参考 MLUtils Scala文档了解更多API细节。
[mw_shl_code=applescript,true]import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.util.MLUtils
import org.apache.spark.rdd.RDD
val examples: RDD[LabeledPoint] = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")[/mw_shl_code]
本地矩阵
本地矩阵的行跟列下标都是整型,且值为复数,存储在单台机器上。MLlib支持浓密矩阵,矩阵值以列顺序存储在单个复数类型的数组,还支持稀疏矩阵,非零的入口值以列顺序存储在压缩的稀疏列。例如,对于浓密矩阵
⎛⎝⎜1.03.05.02.04.06.0⎞⎠⎟
可以存储在一维数组 [1.0, 3.0, 5.0, 2.0, 4.0, 6.0] 中,且矩阵大小为 (3, 2)。
本地矩阵的基类为Matrix,而且我们提供两种实现:DenseMatrix和SparseMatrix。我们推荐使用在矩阵中实现的工厂方法来创建本地矩阵。记住:MLlib的本地矩阵以列顺序存储。
参考Matrix Scala docs 和Matrices Scala docs来了解更多API细节。
[mw_shl_code=applescript,true]import org.apache.spark.mllib.linalg.{Matrix, Matrices}
// Create a dense matrix ((1.0, 2.0), (3.0, 4.0), (5.0, 6.0))
val dm: Matrix = Matrices.dense(3, 2, Array(1.0, 3.0, 5.0, 2.0, 4.0, 6.0))
// Create a sparse matrix ((9.0, 0.0), (0.0, 8.0), (0.0, 6.0))
val sm: Matrix = Matrices.sparse(3, 2, Array(0, 1, 3), Array(0, 2, 1), Array(9, 6, 8))[/mw_shl_code]
分布式矩阵
分布式矩阵的行跟列下标都是长整型,且矩阵的值为复数类型,分布式存储在一个或多个RDD中。选择正确的格式来存储大且分布的矩阵是非常重要的。将矩阵转化为别的格式可能需要全局洗牌,需要大量计算资源。目前实现了四种分布式矩阵。
基本类型叫做RowMatrix。RowMatrix 是面向行的矩阵,行下标没有重要含义。例如,一个特征向量集合。它是由RDD 的行来保障,每行都是本地向量。我们假设列的数量对于RowMatrix不算多,所以一个本地向量可以与driver通信且可以在单个节点上存储/操作。 IndexedRowMatrix 与相似,但有行下标,可以用于标识行并执行join操作。CoordinateMatrix是以协同列表 (COO) 格式存储的分布式矩阵,由RDD的入口来保障。BlockMatrix是由MatrixBlock的RDD保障的分布式矩阵,格式为(Int, Int, Matrix)的元组。
注意点
分布式矩阵涉及的RDD必须是确定的,因为需要缓存矩阵的尺寸。一般使用不确定的RDD会导致错误。
RowMatrix
RowMatrix是面向行的分布式矩阵且行下标没有具体含义,由RDD的行来保障,每行都是本地矩阵。由于每行都是由本地向量表示,所以列数目限制在整数范围且实际使用中应该更小。
RowMatrix可以由RDD[Vector] 实例创建。然后我们可以计算列的总的统计结果和分解。QR 分解法的形式是 A = QR,其中Q是正交矩阵,R是上三角矩阵。对于单值分解 (SVD)和主要成分分析 (PCA),请查阅降维法。
查看 RowMatrix Scala docs 了解更多API使用详情。
[mw_shl_code=applescript,true]import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.mllib.linalg.distributed.RowMatrix
val rows: RDD[Vector] = ... // an RDD of local vectors
// Create a RowMatrix from an RDD[Vector].
val mat: RowMatrix = new RowMatrix(rows)
// Get its size.
val m = mat.numRows()
val n = mat.numCols()
// QR decomposition
val qrResult = mat.tallSkinnyQR(true)[/mw_shl_code]
IndexedRowMatrix
IndexedRowMatrix与RowMatrix类似,但行下标有含义。由已编号的多行的RDD来保障,因此每行都由下标(长整型)和本地向量来表示。
IndexedRowMatrix可以由RDD[IndexedRow] 实例创建,其中的IndexedRow是 (Long, Vector)类型的包装。IndexedRowMatrix可以通过丢弃行下标来转化为RowMatrix。
查看 IndexedRowMatrix Scala docs了解更多API使用详情。
[mw_shl_code=applescript,true]import org.apache.spark.mllib.linalg.distributed.{IndexedRow, IndexedRowMatrix, RowMatrix}
val rows: RDD[IndexedRow] = ... // an RDD of indexed rows
// Create an IndexedRowMatrix from an RDD[IndexedRow].
val mat: IndexedRowMatrix = new IndexedRowMatrix(rows)
// Get its size.
val m = mat.numRows()
val n = mat.numCols()
// Drop its row indices.
val rowMat: RowMatrix = mat.toRowMatrix()[/mw_shl_code]
CoordinateMatrix
CoordinateMatrix由RDD的入口保障的分布式矩阵。每个入口都是 (i: Long, j: Long, value: Double)格式的元组,其中 i 是行下标,j是列下标,value是入口值。CoordinateMatrix只有在矩阵大且非常稀疏的情况下才用到。
CoordinateMatrix可以由RDD[MatrixEntry] 实例创建,MatrixEntry是 (Long, Long, Double)类型的包装。CoordinateMatrix通过调用toIndexedRowMatrix方法可以转化为稀疏行的IndexedRowMatrix。其他的计算或 CoordinateMatrix 目前还不支持 。
参考CoordinateMatrix Scala docs了解更多API使用细节。
[mw_shl_code=applescript,true]import org.apache.spark.mllib.linalg.distributed.{CoordinateMatrix, MatrixEntry}
val entries: RDD[MatrixEntry] = ... // an RDD of matrix entries
// Create a CoordinateMatrix from an RDD[MatrixEntry].
val mat: CoordinateMatrix = new CoordinateMatrix(entries)
// Get its size.
val m = mat.numRows()
val n = mat.numCols()
// Convert it to an IndexRowMatrix whose rows are sparse vectors.
val indexedRowMatrix = mat.toIndexedRowMatrix()[/mw_shl_code]
BlockMatrix
BlockMatrix是通过包含了MatrixBlocks的RDD保障的分布式矩阵,其中的MatrixBlock格式是 ((Int, Int), Matrix)元组,其中 (Int, Int) 是块的下标,Matrix是给定下标,尺寸为 rowsPerBlock x colsPerBlock的子矩阵。BlockMatrix支持将BlockMatrix相加和相乘。BlockMatrix还有帮助函数可以用于检查BlockMatrix 是否创建正确。
创建BlockMatrix最简单的方法可以调用 IndexedRowMatrix 或 CoordinateMatrix 的toBlockMatrix方法。toBlockMatrix 方法默认创建大小为1024 x 1024的块。用户可以通过给方法toBlockMatrix(rowsPerBlock, colsPerBlock)传值来改变块的大小。
参考 BlockMatrix Scala docs 了解更多API的使用方法。
[mw_shl_code=applescript,true]import org.apache.spark.mllib.linalg.distributed.{BlockMatrix, CoordinateMatrix, MatrixEntry}
val entries: RDD[MatrixEntry] = ... // an RDD of (i, j, v) matrix entries
// Create a CoordinateMatrix from an RDD[MatrixEntry].
val coordMat: CoordinateMatrix = new CoordinateMatrix(entries)
// Transform the CoordinateMatrix to a BlockMatrix
val matA: BlockMatrix = coordMat.toBlockMatrix().cache()
// Validate whether the BlockMatrix is set up properly. Throws an Exception when it is not valid.
// Nothing happens if it is valid.
matA.validate()
// Calculate A^T A.
val ata = matA.transpose.multiply(matA)[/mw_shl_code]
|
|