分享

Spark 高级分析:第二章第9,10节

feilong 2017-11-3 09:08:45 发表于 连载型 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 1 5709
本帖最后由 feilong 于 2017-11-3 09:10 编辑

问题导读


1.哪些场景使用以及如何使用map方法?
2.apply方法有何作用?
3.reduce方法有何作用?






上一篇:Spark 高级分析:第二章第7,8节
http://www.aboutyun.com/forum.php?mod=viewthread&tid=23197&extra=


第9节 Summary Statistics For Continuous Variables  连续变量的汇总统计

  Spark的countByValue 操作是创造相对较低的基数分类变量数据直方图的一个好方法。但是对于连续变量,如患者记录中每个字段的匹配分值,我们希望能够快速得到关于它们分布的基本统计数据集,如平均值、标准差和极值值,如最大值和最小值。

    对于RDD [Double]实例,Spark API提供了通过隐式类型转换action的一个额外的设置李类似于toInt方法用于String类。这些内隐行为允许我们将以有用的方式扩展RDD的功能,当我们有更多关于如何处理它所包含的值的信息时。

Pair RDDs
    此外,RDD [Double]隐式action,Spark支持RDD [ tuple2 [K,V ] ]类型隐式类型转换,提类似于groupbykey和reducebykey供关键的聚合方法,以及复数个RDD按相同类型key join的方法。

    RDD [Double]隐式函数ststs会提供我们需要的RDD统计值。可以在解析好的RDD中的MatchData记录中的数组第一个值尝试这个函数:
[mw_shl_code=scala,true]parsed.map(md => md.scores(0)).stats()
StatCounter = (count: 5749132, mean: NaN, stdev: NaN, max: NaN, min: NaN)[/mw_shl_code]
    不幸的是,使用在我们的数组的丢失的占位符NaN值干扰了Spark的汇总统计。更可惜的是,Spark目前没有一个很好的方式排除与/或计数的缺失值,所以我们必须Java的Double isNaN函数手动过滤掉使用:
[mw_shl_code=scala,true]import java.lang.Double.isNaN
parsed.map(md => md.scores(0)).filter(!isNaN(_)).stats()
StatCounter = (count: 5748125, mean: 0.7129, stdev: 0.3887, max: 1.0, min: 0.0)[/mw_shl_code]
    如果我们倾向于这样,我们就可以用这种方式得到分数数组中所有值的统计信息,使用Scala的Rang构造创建一个循环,遍历每个索引值并计算列的统计数据,如下所示:
[mw_shl_code=scala,true]val stats = (0 until 9).map(i => {
parsed.map(md => md.scores(i)).filter(!isNaN(_)).stats()
})
stats(1)
...
StatCounter = (count: 103698, mean: 0.9000, stdev: 0.2713, max: 1.0, min: 0.0)
stats(8)
...
StatCounter = (count: 5736289, mean: 0.0055, stdev: 0.0741, max: 1.0, min: 0.0)[/mw_shl_code]

第10节 Creating Reusable Code For Computing Summary Statistics  创建用于计算汇总统计的可重用代码

    这种方法虽然能够完成任务,但是很低效的;我们必须重新处理解析好的RDD中所有的记录九次为了计算所有的统计。当我们的数据集越来越大时,重新处理所有数据的成本不断上升,即使我们在内存中缓存了中间结果,以节省一些处理时间。当我们开发带有Spark的分布式算法时,我们可以花些时间来研究如何计算。我们可需要的答案能是尽可能少地传递数据。在这种情况下,我们想出一个办法来写一个函数,将接收任何RDD [Array[Double] ]类型输入,返回一个包括a)各指标缺失值的数量,b)与每个指标的非缺失值的汇总统计StatCounter的对象的数组。

    每当我们期望我们需要执行的一些分析任务一次又一次地有用时,花一些时间开发我们的代码是很有价值的,这样一来,其他分析人员就可以很容易地使用我们自己分析得出的解决方案。为了做到这一点,我们可以在一个单独的文件中编写Scala代码,然后我们可以将它加载到Spark Shell中进行测试和验证,一旦我们知道了它的工作原理,我们就可以与其他人共享该文件。

    这需要代码复杂度的提高。我们不需要处理一行或两行的单独方法调用和函数,而是需要创建适当的Scala类和API,这意味着使用更复杂的语言特性。

    对于缺少的值分析,我们的第一个任务是编写一个Spark计数模拟器,正确处理丢失的值。在你的客户机shell中,打开一个文件,命名为StatsWithMissing.scala,并复制下面的类定义的文件。我们将浏览下面定义的各个字段和方法。
i[mw_shl_code=scala,true]mport org.apache.spark.util.StatCounter
class NAStatCounter extends Serializable {
val stats: StatCounter = new StatCounter()
var missing: Long = 0
def add(x: Double): NAStatCounter = {
if (java.lang.Double.isNaN(x)) {
missing += 1
} else {
stats.merge(x)
}
this
}
def merge(other: NAStatCounter): NAStatCounter = {
stats.merge(other.stats)
missing += other.missing
this
}
override def toString = {
"stats: " + stats.toString + " NaN: " + missing
}
}
object NAStatCounter extends Serializable {
def apply(x: Double) = new NAStatCounter().add(x)
}[/mw_shl_code]

    我们的NAStatCounter 类有两个成员变量:一个不变的StatCounter的命名实例stats,和一个可变长变量名为missing。请注意,我们标记为可序列化类是因为我们将在Spark RDDS中使用这个类的实例,如果Spark无法序列化包含在一个RDD中的数据作业就会失败。

    类中第一个方法add,使NAStatCounter对象能够新增一个Double类型值,如果一条记录是NaN则记录数加1否则将其新增到StatCounter。merge方法合并跟踪的另一个NAStatCounter实例为当前实例的统计。这两种方法都返回this,以便它们可以很容易地链接在一起。

    最后,我们重写nastatcounter类的toString方法,让我们可以在Spark Shell把内容打印出来。每当我们从Scala中的父类重写方法时,我们需要用override关键字前缀方法定义。Scala允许比java更丰富的方法重写模式,override关键字帮助Scala保持对任何给定类应使用哪种方法定义的跟踪。
随着类的定义,我们定义一个的NAStatCounter的伴生对象。Scala的object关键字用于声明一个单例,可以为一个类提供的辅助方法,类似于在一个java类静态方法的定义。在这种情况下,伴生对象提供的apply方法创建了一个Nastatcounter新实例并赋值Double类型值。在Scala中,apply方法有一些特殊的语法糖,允许你在不必显式地输入它们的情况下调用它们;例如,这两行代码做的是完全相同的事情:
[mw_shl_code=scala,true]val nastats = NAStatCounter.apply(17.29)
val nastats = NAStatCounter(17.29)[/mw_shl_code]
现在我们已经定义了NAStatCounter,让我们通过Spark shell完成它并使用load命令保存为StatsWithMissing.scala文件:
[mw_shl_code=scala,true]:load StatsWithMissing.scala
...
Loading StatsWithMissing.scala...
import org.apache.spark.util.StatCounter
defined class NAStatCounter
defined module NAStatCounter
warning: previously defined class NAStatCounter is not a companion to object NAStatCounter.
Companions must be defined together; you may wish to use :paste mode for this.[/mw_shl_code]
    我们得到了一个警告,我们的对象在shell使用的增量编译模式中无效,但是我们可以验证一些示例的工作与我们预期的一样:
[mw_shl_code=scala,true]val nas1 = NAStatCounter(10.0)
nas1.add(2.1)
val nas2 = NAStatCounter(Double.NaN)
nas1.merge(nas2)[/mw_shl_code]
    让我们用我们的新Nastatcounter类在解析好的RDD中处理Matchdata记录中的分数。每个Matchdata实例包含Array[Double]类型成绩数组。对于数组中每一项,我们希望有一个Nastatcounter实例追踪有多少值是NaN随着非缺失值的正则分布统计。给定一个数组,我们可以使用map函数来创建Nastatcounter对象数组:
[mw_shl_code=scala,true]val arr = Array(1.0, Double.NaN, 17.29)
val nas = arr.map(d => NAStatCounter(d))
每条记录都是Array[Double]的RDD可以转化成每条记录都是Array[Nastatcounter]的RDD。让我们来解析集群数据:
val nasRDD = parsed.map(md => {
md.scores.map(d => NAStatCounter(d))
})[/mw_shl_code]
    我们现在需要一个简单的方法来聚合Arra[ Nastatcounter ]多个实例为一个数组[ Nastatcounter ]。两个长度相同的数组可以使用ZIP进行组合。这将生成两个数组中相应对元素的新数组。想象一个拉链把两个对应的齿条配对成一个固定的齿条。这可以通过一个map的方法,使用Nastatcounter类merge函数结将一个数据对象合成单实例:
[mw_shl_code=scala,true]val nas1 = Array(1.0, Double.NaN).map(d => NAStatCounter(d))
val nas2 = Array(Double.NaN, 2.0).map(d => NAStatCounter(d))
val merged = nas1.zip(nas2).map(p => p._1.merge(p._2))[/mw_shl_code]
    我们甚至可以使用Scala case语法将压缩数组元素对分解成可读性更强的值,而不是元组类的_1和_2方法:
[mw_shl_code=scala,true]val merged = nas1.zip(nas2).map { case (a, b) => a.merge(b) }[/mw_shl_code]
    在Scala集合在对所有的记录执行这一合并操作,我们可以用reduce函数,它将map类型的两个参数T转化为单个T类型作为返回值,并一遍又一遍应用于的对所有集合中的元素,将所有的值合并在一起。由于合并逻辑我们上面写的是关联的,我们可以用reduce方法集合Array[Nastatcounter ]值:
[mw_shl_code=scala,true]val nas = List(nas1, nas2)
val merged = nas.reduce((n1, n2) => {
n1.zip(n2).map { case (a, b) => a.merge(b) }
})[/mw_shl_code]
    RDD也有与Scala集合功能相同reduce方法,只不过是应用在分布式集群上的。以下是我们使用Spark为List[Array[NAStatCounter]]写的代码:
[mw_shl_code=scala,true]val reduced = nasRDD.reduce((n1, n2) => {
n1.zip(n2).map { case (a, b) => a.merge(b) }
})
reduced.foreach(println)
...
stats: (count: 5748125, mean: 0.7129, stdev: 0.3887, max: 1.0, min: 0.0) NaN: 1007
stats: (count: 103698, mean: 0.9000, stdev: 0.2713, max: 1.0, min: 0.0) NaN: 5645434
stats: (count: 5749132, mean: 0.3156, stdev: 0.3342, max: 1.0, min: 0.0) NaN: 0
stats: (count: 2464, mean: 0.3184, stdev: 0.3684, max: 1.0, min: 0.0) NaN: 5746668
stats: (count: 5749132, mean: 0.9550, stdev: 0.2073, max: 1.0, min: 0.0) NaN: 0
stats: (count: 5748337, mean: 0.2244, stdev: 0.4172, max: 1.0, min: 0.0) NaN: 795
stats: (count: 5748337, mean: 0.4888, stdev: 0.4998, max: 1.0, min: 0.0) NaN: 795
stats: (count: 5748337, mean: 0.2227, stdev: 0.4160, max: 1.0, min: 0.0) NaN: 795
stats: (count: 5736289, mean: 0.0055, stdev: 0.0741, max: 1.0, min: 0.0) NaN: 12843[/mw_shl_code]
    我们可将丢失值分析代码封装成一个功能保存在StatsWithMissing.scala文件中,它可以让我们来计算这些任何带有RDD [Array[Double] ]的统计通过编辑文件来包含该代码块:
[mw_shl_code=scala,true]import org.apache.spark.rdd.RDD
def statsWithMissing(rdd: RDD[Array[Double]]): Array[NAStatCounter] = {
val nastats = rdd.mapPartitions((iter: Iterator[Array[Double]]) => {
val nas: Array[NAStatCounter] = iter.next().map(d => NAStatCounter(d))
iter.foreach(arr => {
nas.zip(arr).foreach { case (n, d) => n.add(d) }
})
Iterator(nas)
})
nastats.reduce((n1, n2) => {
n1.zip(n2).map { case (a, b) => a.merge(b) }
})
}[/mw_shl_code]
    对输入RDD每条记录来说不是调用map函数生成一个Array[Nastatcounter ],而是调用更高阶的函数mapPartitions,这使我们能够通过迭代Array[Double]处理输入RDD[Array[Double]]所有的记录分区。这使我们能够为每个分区的数据创建一个Array[ Nastatcounter ]实例,然后更新它的状态,由给定迭代器返回的Array[Double]值,这是更有效的实现。事实上,我们的statsWithMissing方法现在非常类似于为RDD[Double]实例用Spark开发实现的stats方法。

已有(1)人评论

跳转到指定楼层
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条