Spark 高级分析:第二章第9,10节
本帖最后由 feilong 于 2017-11-3 09:10 编辑问题导读
1.哪些场景使用以及如何使用map方法?
2.apply方法有何作用?
3.reduce方法有何作用?
static/image/hrline/4.gif
上一篇:Spark 高级分析:第二章第7,8节
http://www.aboutyun.com/forum.php?mod=viewthread&tid=23197&extra=
第9节 Summary Statistics For Continuous Variables连续变量的汇总统计
Spark的countByValue 操作是创造相对较低的基数分类变量数据直方图的一个好方法。但是对于连续变量,如患者记录中每个字段的匹配分值,我们希望能够快速得到关于它们分布的基本统计数据集,如平均值、标准差和极值值,如最大值和最小值。
对于RDD 实例,Spark API提供了通过隐式类型转换action的一个额外的设置李类似于toInt方法用于String类。这些内隐行为允许我们将以有用的方式扩展RDD的功能,当我们有更多关于如何处理它所包含的值的信息时。
Pair RDDs 此外,RDD 隐式action,Spark支持RDD [ tuple2 ]类型隐式类型转换,提类似于groupbykey和reducebykey供关键的聚合方法,以及复数个RDD按相同类型key join的方法。
RDD 隐式函数ststs会提供我们需要的RDD统计值。可以在解析好的RDD中的MatchData记录中的数组第一个值尝试这个函数:parsed.map(md => md.scores(0)).stats()StatCounter = (count: 5749132, mean: NaN, stdev: NaN, max: NaN, min: NaN) 不幸的是,使用在我们的数组的丢失的占位符NaN值干扰了Spark的汇总统计。更可惜的是,Spark目前没有一个很好的方式排除与/或计数的缺失值,所以我们必须Java的Double isNaN函数手动过滤掉使用:import java.lang.Double.isNaNparsed.map(md => md.scores(0)).filter(!isNaN(_)).stats()StatCounter = (count: 5748125, mean: 0.7129, stdev: 0.3887, max: 1.0, min: 0.0) 如果我们倾向于这样,我们就可以用这种方式得到分数数组中所有值的统计信息,使用Scala的Rang构造创建一个循环,遍历每个索引值并计算列的统计数据,如下所示:val stats = (0 until 9).map(i => {parsed.map(md => md.scores(i)).filter(!isNaN(_)).stats()})stats(1)...StatCounter = (count: 103698, mean: 0.9000, stdev: 0.2713, max: 1.0, min: 0.0)stats(8)...StatCounter = (count: 5736289, mean: 0.0055, stdev: 0.0741, max: 1.0, min: 0.0)
第10节 Creating Reusable Code For Computing Summary Statistics创建用于计算汇总统计的可重用代码
这种方法虽然能够完成任务,但是很低效的;我们必须重新处理解析好的RDD中所有的记录九次为了计算所有的统计。当我们的数据集越来越大时,重新处理所有数据的成本不断上升,即使我们在内存中缓存了中间结果,以节省一些处理时间。当我们开发带有Spark的分布式算法时,我们可以花些时间来研究如何计算。我们可需要的答案能是尽可能少地传递数据。在这种情况下,我们想出一个办法来写一个函数,将接收任何RDD ]类型输入,返回一个包括a)各指标缺失值的数量,b)与每个指标的非缺失值的汇总统计StatCounter的对象的数组。
每当我们期望我们需要执行的一些分析任务一次又一次地有用时,花一些时间开发我们的代码是很有价值的,这样一来,其他分析人员就可以很容易地使用我们自己分析得出的解决方案。为了做到这一点,我们可以在一个单独的文件中编写Scala代码,然后我们可以将它加载到Spark Shell中进行测试和验证,一旦我们知道了它的工作原理,我们就可以与其他人共享该文件。
这需要代码复杂度的提高。我们不需要处理一行或两行的单独方法调用和函数,而是需要创建适当的Scala类和API,这意味着使用更复杂的语言特性。
对于缺少的值分析,我们的第一个任务是编写一个Spark计数模拟器,正确处理丢失的值。在你的客户机shell中,打开一个文件,命名为StatsWithMissing.scala,并复制下面的类定义的文件。我们将浏览下面定义的各个字段和方法。import org.apache.spark.util.StatCounterclass NAStatCounter extends Serializable {val stats: StatCounter = new StatCounter()var missing: Long = 0def add(x: Double): NAStatCounter = {if (java.lang.Double.isNaN(x)) {missing += 1} else {stats.merge(x)}this}def merge(other: NAStatCounter): NAStatCounter = {stats.merge(other.stats)missing += other.missingthis}override def toString = {"stats: " + stats.toString + " NaN: " + missing}}object NAStatCounter extends Serializable {def apply(x: Double) = new NAStatCounter().add(x)}
我们的NAStatCounter 类有两个成员变量:一个不变的StatCounter的命名实例stats,和一个可变长变量名为missing。请注意,我们标记为可序列化类是因为我们将在Spark RDDS中使用这个类的实例,如果Spark无法序列化包含在一个RDD中的数据作业就会失败。
类中第一个方法add,使NAStatCounter对象能够新增一个Double类型值,如果一条记录是NaN则记录数加1否则将其新增到StatCounter。merge方法合并跟踪的另一个NAStatCounter实例为当前实例的统计。这两种方法都返回this,以便它们可以很容易地链接在一起。
最后,我们重写nastatcounter类的toString方法,让我们可以在Spark Shell把内容打印出来。每当我们从Scala中的父类重写方法时,我们需要用override关键字前缀方法定义。Scala允许比java更丰富的方法重写模式,override关键字帮助Scala保持对任何给定类应使用哪种方法定义的跟踪。随着类的定义,我们定义一个的NAStatCounter的伴生对象。Scala的object关键字用于声明一个单例,可以为一个类提供的辅助方法,类似于在一个java类静态方法的定义。在这种情况下,伴生对象提供的apply方法创建了一个Nastatcounter新实例并赋值Double类型值。在Scala中,apply方法有一些特殊的语法糖,允许你在不必显式地输入它们的情况下调用它们;例如,这两行代码做的是完全相同的事情:val nastats = NAStatCounter.apply(17.29)val nastats = NAStatCounter(17.29)现在我们已经定义了NAStatCounter,让我们通过Spark shell完成它并使用load命令保存为StatsWithMissing.scala文件::load StatsWithMissing.scala...Loading StatsWithMissing.scala...import org.apache.spark.util.StatCounterdefined class NAStatCounterdefined module NAStatCounterwarning: previously defined class NAStatCounter is not a companion to object NAStatCounter.Companions must be defined together; you may wish to use :paste mode for this. 我们得到了一个警告,我们的对象在shell使用的增量编译模式中无效,但是我们可以验证一些示例的工作与我们预期的一样:val nas1 = NAStatCounter(10.0)nas1.add(2.1)val nas2 = NAStatCounter(Double.NaN)nas1.merge(nas2) 让我们用我们的新Nastatcounter类在解析好的RDD中处理Matchdata记录中的分数。每个Matchdata实例包含Array类型成绩数组。对于数组中每一项,我们希望有一个Nastatcounter实例追踪有多少值是NaN随着非缺失值的正则分布统计。给定一个数组,我们可以使用map函数来创建Nastatcounter对象数组:val arr = Array(1.0, Double.NaN, 17.29)val nas = arr.map(d => NAStatCounter(d))每条记录都是Array的RDD可以转化成每条记录都是Array的RDD。让我们来解析集群数据:val nasRDD = parsed.map(md => {md.scores.map(d => NAStatCounter(d))}) 我们现在需要一个简单的方法来聚合Arra[ Nastatcounter ]多个实例为一个数组[ Nastatcounter ]。两个长度相同的数组可以使用ZIP进行组合。这将生成两个数组中相应对元素的新数组。想象一个拉链把两个对应的齿条配对成一个固定的齿条。这可以通过一个map的方法,使用Nastatcounter类merge函数结将一个数据对象合成单实例:val nas1 = Array(1.0, Double.NaN).map(d => NAStatCounter(d))val nas2 = Array(Double.NaN, 2.0).map(d => NAStatCounter(d))val merged = nas1.zip(nas2).map(p => p._1.merge(p._2)) 我们甚至可以使用Scala case语法将压缩数组元素对分解成可读性更强的值,而不是元组类的_1和_2方法:val merged = nas1.zip(nas2).map { case (a, b) => a.merge(b) } 在Scala集合在对所有的记录执行这一合并操作,我们可以用reduce函数,它将map类型的两个参数T转化为单个T类型作为返回值,并一遍又一遍应用于的对所有集合中的元素,将所有的值合并在一起。由于合并逻辑我们上面写的是关联的,我们可以用reduce方法集合Array值:val nas = List(nas1, nas2)val merged = nas.reduce((n1, n2) => {n1.zip(n2).map { case (a, b) => a.merge(b) }}) RDD也有与Scala集合功能相同reduce方法,只不过是应用在分布式集群上的。以下是我们使用Spark为List]写的代码:val reduced = nasRDD.reduce((n1, n2) => {n1.zip(n2).map { case (a, b) => a.merge(b) }})reduced.foreach(println)...stats: (count: 5748125, mean: 0.7129, stdev: 0.3887, max: 1.0, min: 0.0) NaN: 1007stats: (count: 103698, mean: 0.9000, stdev: 0.2713, max: 1.0, min: 0.0) NaN: 5645434stats: (count: 5749132, mean: 0.3156, stdev: 0.3342, max: 1.0, min: 0.0) NaN: 0stats: (count: 2464, mean: 0.3184, stdev: 0.3684, max: 1.0, min: 0.0) NaN: 5746668stats: (count: 5749132, mean: 0.9550, stdev: 0.2073, max: 1.0, min: 0.0) NaN: 0stats: (count: 5748337, mean: 0.2244, stdev: 0.4172, max: 1.0, min: 0.0) NaN: 795stats: (count: 5748337, mean: 0.4888, stdev: 0.4998, max: 1.0, min: 0.0) NaN: 795stats: (count: 5748337, mean: 0.2227, stdev: 0.4160, max: 1.0, min: 0.0) NaN: 795stats: (count: 5736289, mean: 0.0055, stdev: 0.0741, max: 1.0, min: 0.0) NaN: 12843 我们可将丢失值分析代码封装成一个功能保存在StatsWithMissing.scala文件中,它可以让我们来计算这些任何带有RDD ]的统计通过编辑文件来包含该代码块:import org.apache.spark.rdd.RDDdef statsWithMissing(rdd: RDD]): Array = {val nastats = rdd.mapPartitions((iter: Iterator]) => {val nas: Array = iter.next().map(d => NAStatCounter(d))iter.foreach(arr => {nas.zip(arr).foreach { case (n, d) => n.add(d) }})Iterator(nas)})nastats.reduce((n1, n2) => {n1.zip(n2).map { case (a, b) => a.merge(b) }})} 对输入RDD每条记录来说不是调用map函数生成一个Array,而是调用更高阶的函数mapPartitions,这使我们能够通过迭代Array处理输入RDD]所有的记录分区。这使我们能够为每个分区的数据创建一个Array[ Nastatcounter ]实例,然后更新它的状态,由给定迭代器返回的Array值,这是更有效的实现。事实上,我们的statsWithMissing方法现在非常类似于为RDD实例用Spark开发实现的stats方法。
感谢分享
页:
[1]