问题导读
1.如何使用Spark实现K-Means聚类?
2.K-Means聚类对输入有何要求?
3.如何查看聚类结果?
上一篇:Spark 高级分析:第五章第3,4节
http://www.aboutyun.com/forum.php?mod=viewthread&tid=24042&extra=
kddcup.data.gz数据文件应该被解压缩并复制到HDFS中。这个示例与其他示例一样,将假定该文件在/user/ds/kddcup.data中可用。打开sparkshell,并将CSV数据作为String类型的RDD加载:
[mw_shl_code=scala,true]val rawData = sc.textFile("hdfs:///user/ds/kddcup.data")[/mw_shl_code]
首先探索数据集,数据中有哪些标签,每个标签又有多少?下面的代码通过将标签转化为label-count元组,根据计数对它们进行排序,并打印结果:
[mw_shl_code=scala,true]rawData.map(_.split(',').last).countByValue().toSeq.
sortBy(_._2).reverse.foreach(println)
在Spark和Scala中可以完成很多事情!有23个不同的标签,最常见的是smurf,neptune攻击:
(smurf.,2807886)
(neptune.,1072017)
(normal.,972781)
(satan.,15892)
...[/mw_shl_code]
注意,数据包含非数值特性。例如,第二列可能是tcp、udp或icmp,但是K-means聚类需要数字特性。最后一列也是非数字的。首先,这些将被忽略。下面的Spark代码将CSV行拆分为列,删除从索引1开始的3个类别值列,并删除最后一列。剩余的值被转换成一个数值数组(Double object),并以元组中的最后一个标签列发出:
[mw_shl_code=scala,true]import org.apache.spark.mllib.linalg._
val labelsAndData = rawData.map { line =>
val buffer = line.split(',').toBuffer
buffer.remove(1, 3)
val label = buffer.remove(buffer.length-1)
val vector = Vectors.dense(buffer.map(_.toDouble).toArray)
(label,vector)
}
val data = labelsAndData.values.cache()[/mw_shl_code]
K-means将只对特征向量进行操作。因此,RDD数据只包含每个元组的第二个元素,在一个元组的RDD中使用值访问它。用Spark MLlib聚类数据就像导入KMeans实现并运行它一样简单。下面的代码将数据群集化,创建一个KMeansModel,然后打印它的centroid:
[mw_shl_code=scala,true]import org.apache.spark.mllib.clustering._
val kmeans = new KMeans()
val model = kmeans.run(data)
model.clusterCenters.foreach(println)[/mw_shl_code]
两个向量将被打印出来,这意味着对于数据来说k均值拟合k = 2的簇。对于已知至少有23种不同类型的连接的复杂数据集,这几乎肯定不足以精确地模拟数据中的不同分组。
这是一个很好的机会,可以使用给定的标签,通过计算每个簇中的标签,来直观地了解这两个簇的内容。下面的代码使用模型将每个数据点分配给一个簇,并计算簇和标签对的出现,并将它们正确地打印出来:
[mw_shl_code=scala,true]val clusterLabelCount = labelsAndData.map { case (label,datum) =>
val cluster = model.predict(datum)
(cluster,label)
}.countByValue
clusterLabelCount.toSeq.sorted.foreach {
case ((cluster,label),count) =>
println(f"$cluster%1s$label%18s$count%8s")
}[/mw_shl_code]
结果表明,聚类方法并无帮助。只有1个数据点出现在第一组!
[mw_shl_code=scala,true]cluster 1!
0 back. 2203
0 buffer_overflow. 30
0 ftp_write. 8
0 guess_passwd. 53
0 imap. 12
0 ipsweep. 12481
0 land. 21
0 loadmodule. 9
0 multihop. 7
0 neptune. 1072017
0 nmap. 2316
0 normal. 972781
0 perl. 3
0 phf. 4
0 pod. 264
0 portsweep. 10412
0 rootkit. 10
0 satan. 15892
0 smurf. 2807886
0 spy. 2
0 teardrop. 979
0 warezclient. 1020
0 warezmaster. 20
1 portsweep. 1[/mw_shl_code]
|
|