本帖最后由 feilong 于 2018-3-30 11:39 编辑
问题导读
1.如何使用Spark实现K-Means聚类k值的选择?
2.K值是不是越大越好?
3.如何使用scala创建一个带给定步长的数字集合?
4.Spark MLib实现K-Means聚类用了哪个算法?
上一篇:Spark 高级分析:第五章第5节 聚类首次尝试
http://www.aboutyun.com/forum.php?mod=viewthread&tid=24140&extra=
两个簇显然是不够的。有多少簇适合这个数据集?很明显,数据中有23个不同的模式,所以看起来k可能至少有23个,或者更有可能。通常,k的许多值都试图找到最好的。但什么是“最好的”?
如果每个数据点都接近最接近的质心,则可以认为聚类是好的。因此,我们定义了一个欧式距离函数,一个函数返回一个数据点到它最近的簇的质心的距离:
[mw_shl_code=scala,true]def distance(a: Vector, b: Vector) =
math.sqrt(a.toArray.zip(b.toArray).
map(p => p._1 - p._2).map(d => d * d).sum)
def distToCentroid(datum: Vector, model: KMeansModel) = {
val cluster = model.predict(datum)
val centroid = model.clusterCenters(cluster)
distance(centroid, datum)
}[/mw_shl_code]
你可以通过解析Scala函数来读出欧几里得距离的定义,反过来:和(sum)的平方(map(d d * d))的差异(map(p p)。在两个向量的对应元素中(a.toArray.zip(b.toArray)),并取平方根(math.sqrt)。
由此,可以定义一个函数,对于一个给定k的模型,该函数测量的是与质心的平均距离:
[mw_shl_code=scala,true]import org.apache.spark.rdd._
def clusteringScore(data: RDD[Vector], k: Int) = {
val kmeans = new KMeans()
kmeans.setK(k)
val model = kmeans.run(data)
data.map(datum => distToCentroid(datum, model)).mean()
}[/mw_shl_code]
现在,这可以用来评估k从5到40的值:
[mw_shl_code=scala,true](5 to 40 by 5).map(k => (k, clusteringScore(data, k))).
foreach(println)[/mw_shl_code]
(x to y by z)语法是一个Scala语法,用于在开始和结束之间(包含)之间创建一个数字集合,在连续的元素之间有一个给定的差异。这是一种简洁的方法,可以为k创建“5、10、15、20、25、30、35、40”的值,然后对每个值执行一些操作。
打印结果显示,分数随着k的增加而降低:
[mw_shl_code=scala,true](5,1938.858341805931)
(10,1689.4950178959496)
(15,1381.315620528147)
(20,1318.256644582388)
(25,932.0599419255919)
(30,594.2334547238697)
(35,829.5361226176625)
(40,424.83023056838846)[/mw_shl_code]
同样,你的值可能也会有所不同。聚类依赖于随机选择的初始质心集。
然而,这是显而易见的。随着更多的簇被添加,应该总是能够使数据点靠近最近的质心。事实上,如果k被选择等于数据点的个数,那么平均距离是0,因为每个点都是它自己的一个集合!
更糟的是,在上面的结果中,k = 35的距离比k = 30要远。这不该发生,高k总是允许至少和低k的簇一样好。问题是,k - means并不一定能找到聚类的最优k值。其迭代过程从随机起点到局部最小值收敛,这可能是好的,但不是最优。
即使使用更智能的方法来选择初始质心,这仍然是正确的。K-means++和K-means||是多种选择算法的变体,它们更倾向于选择不同的、分离的质心,并更可靠地引导出良好的聚类。Spark MLlib实际上实现了K-Means||。然而,在选择中仍然有一个随机性元素,不能保证最优的聚类。
为k = 35所选择的集群的随机启动集可能导致了一个特别的次优簇,或者,它可能在到达局部最优之前就停止了。这可以通过多次为k值运行聚类而得到改进,每次都有一个不同的随机启动centroid设置,并选择最好的簇。该算法开放了setRuns()来设置一个k的集群运行次数。
它还可以通过运行更长时间的迭代来改进。该算法通过setEpsilon()设置一个阈值,该阈值控制被认为具有重要意义的聚类中心运动的最小量;更低的值意味着K-means算法会让centroids继续运行更长时间。
再次运行相同的测试,但是尝试更大的值,从30到100。在下面的示例中,从30到100的范围在Scala中变成了一个并行集合。这将导致每一个k的计算在Spark Shell中并行运行。Spark将同时管理每个计算。当然,每个k的计算也是在集群上的分布式操作。并行中有并行。这可以通过充分利用大型集群来提高总体吞吐量,尽管在某些情况下,同时提交大量的任务会产生相反的效果。
[mw_shl_code=scala,true]...
kmeans.setRuns(10)
kmeans.setEpsilon(1.0e-6)
...
(30 to 100 by 10).par.map(k => (k, clusteringScore(data, k))).
toList.foreach(println)[/mw_shl_code]
这一次,分数持续下降:
[mw_shl_code=scala,true](30,862.9165758614838)
(40,801.679800071455)
(50,379.7481910409938)
(60,358.6387344388997)
(70,265.1383809649689)
(80,232.78912076732163)
(90,230.0085251067184)
(100,142.84374573413373)[/mw_shl_code]
我们仍然不知道如何选择k,因为更大的值应该总是趋向于产生更好的分数。我们想要找到一个点,随着k值的增加分数不会减少很多,或者是k-score的图的拐点,它通常会减少,但最终会变平滑。在这里,它超过100似乎在显著减少。k的正确值可能是100。
|
|