feilong 发表于 2017-10-20 08:41:37

Spark 高级分析:第二章第5,6节

本帖最后由 feilong 于 2017-10-27 09:58 编辑

问题导读

1.什么是Tuples?什么是case class?
2.什么是隐式类型转换?
3.RDD存储级别有哪几个?分别都是什么场景下使用?


static/image/hrline/4.gif



上一篇:Spark 高级分析:第二章第4节 开始学习Spark Shell和SparkContext
http://www.aboutyun.com/forum.php?mod=viewthread&tid=22903&page=1#pid242441


第5节从客户机到集群的传输代码 Shipping Code from the Client to the Cluster

    我们刚看了用不同的方法通过Scala编写和应用函数操作数据。我们执行的所有代码都只是针对比较数组中靠前的数据,这些数据存储在客户机上。现在我们要将其应用到存储在器集群上数以万计的记录链接上,并以Spark RDD表示。
    下面是示例代码;你应该感觉到很熟悉。
    val noheader = rawblocks.filter(x => !isHeader(x))
    我们用来对集群上的整个数据集表示过滤计算的语法与我们在本地机器上对头部数据数组表示过滤计算的语法完全相同。我们可以使用first方法操作非头部RDD 验证过滤规则是否正确运行。

    noheader.first
    ...
    res: String = 37291,53113,0.833333333333333,?,1,?,1,1,1,1,0,TRUE
    这点很强大,这意味着我们可以对少量样本数据,交互开发和调试修改我们的代码,我们从集群的数据,然后,当我们准备转换整个数据集时,使用我们在本地使用的相同的代码和语法,将该代码传送到集群中,将其应用到整个数据集,并且我们从未离开shell来完成它。真的没有其他工具能给你这种体验。

    在下面几节中,我们将使用这个混合的本地开发和测试和集群计算进行更多的修改和分析记录数据,但如果你需要一点时间来欣赏令人惊叹的新世界,你刚刚进入,我们当然理解。

第6节 用Tuples和case类构造数据 Structuring Data with Tuples and Case Classes

    现在,头部数组和非头部RDD中都是都好分割的字符串。为了更好地分析数据,需要将这些字符串格式化,将不同的字段转换为正确的数据类型,例如整型或者双精度型。

    如果我们查看头部数组中的内容就会发现所有的记录都是如下数据格式:
    1. 开始两个字段是整型的ID,表示患者。
    2. 后续的九个值(可能有缺失)是双精度型,表示患者记录不同的字段,例如名字,生日和住址。
    3. 最后一个字段是布尔型(true/false),指示由行表示的对患者记录是否匹配。

    同Python一样,Scala内置了tuple类型,我们可以用其创建成对的,三元组,和更大的不同类型数据组成的集合作为表示记录的简单方式。让我们暂时来将每一行的内容解析到有四个值的一个元组:第一个病人ID,第二病人ID,匹配分值的长度为9的双精度数组(NaN代表缺失的值),和一个布尔值,指示是否字段匹配。

    与Python不同的是,Scala没有内置解析都好分割字符串的方法,我们需要自己做一些工作。我们可以使用Scala REPL实验我们的解析代码。首先从头部数组获取一条记录:
   
    val line = head(5)
    val pieces = line.split(',')
    ...
    pieces: Array = Array(36950, 42116, 1, ?,...
    获得数组元素使用圆括号代替中括号;Scala中,访问数组元素是函数调用而不是一个运算符。Scala允许类定义一个名为apply的特殊函数,当我们将一个对象当做函数处理,所以head(5)和head.apply(5)是一样的。
使用Java中String类的split方法将数据分割,返回一个Array类型数组,将之声明为pieces。现在我们要将pieces中的每个元素使用Scala中的类型转换函数转化为正确的类型:

    val id1 = pieces(0).toInt
    val id2 = pieces(1).toInt
    val matched = pieces(11).toBoolean
    转换id变量和匹配的布尔变量非常简单,只要我们知道合适的toXYZ转换函数。与contains和split方法不同,toInt和toBoolean不是Java的String类中定义的方法。相反,它们是在ScalaStringOps类中定义的,使用了Scala的一个更强大的(可以说是有点危险)特征:隐式类型转换。隐式转换是这样工作的:如果你调用一个Scala对象的方法,Scala编译器不看对象的类定义中的方法定义,编译器会把对象尝试转换为类中没有定义其方法的类的实例。在这种情况下,编译器会发现java的String类没有Toint方法的定义,但StringOps中有,StringOps类有一个可以转换String类的实例为stringops类实例的方法。编译器隐式地将String对象转换成一个StringOps对象,然后调用新对象的Toint方法。

    用Scala编写库(包括核心Spark开发人员)的开发人员非常喜欢隐式类型转换;它允许他们增强核心类的功能,比如String,否则会被关闭修改。作为这些工具的用户,隐式类型转换更像是一个混杂的包,因为它们很难准确地确定一个特定的类方法是在什么地方定义的。尽管如此,我们将在整个学习过程中遇到隐式转换,所以最好现在就习惯它们。

    我们仍要将9个分值字段转换为双精度值。为了转换它们,使用Scala Array类的slice方法获得一个数组子集,然后利用map高阶函数将每个元素由String转化为Double类型:

    val rawscores = pieces.slice(2, 11)
    rawscores.map(s => s.toDouble)
    ...
    java.lang.NumberFormatException: For input string: "?"
    at sun.misc.FloatingDecimal.readJavaFormatString(FloatingDecimal.java:1241)
    at java.lang.Double.parseDouble(Double.java:540)
    ...
    我们忘了可能有”?”实体在源数组中,toDouble方法不能将其转化为Double类型。我们来自己写一个函数实现遇到“?”返回NaN,然后应用于原数组:
   
   def toDouble(s: String) = {
    if ("?".equals(s)) Double.NaN else s.toDouble
    }
    val scores = rawscores.map(toDouble)
    scores: Array = Array(1.0, NaN, 1.0, 1.0, ...
    现在好很多了。我们再将解析代码合并成一个函数,返回值为带有所有解析值的元组:
   
    def parse(line: String) = {
    val pieces = line.split(',')
    val id1 = pieces(0).toInt
    val id2 = pieces(1).toInt
    val scores = pieces.slice(2, 11).map(toDouble)
    val matched = pieces(11).toBoolean
    (id1, id2, scores, matched)
    }
    val tup = parse(line)
    我们可以通过位置函数在元组中检索每个字段的值,通过_1或者productElement方法,下标从0开始。也可以用productArity方法获取元组的大小:
   
    tup._1
    tup.productElement(0)
    tup.productArity
    尽管使用Scala创建元组很简洁方便,元素位置也容易确定,但没有意义的命名仍会让我们的代码难以理解。我们更希望通过名字来确定记录中的字段而不是位置。很幸运,Scala提供了这样便利的语法case 类。一个case类是一个简单的不可变类,配备了所有基本的java类的方法,例如toString,equals和hashCode,这使它们非常容易使用。让我们为记录链接数据声明一个case类:
   
    case class MatchData(id1: Int, id2: Int,
    scores: Array, matched: Boolean)
    现在我们修改解析方法返回值为MatchData类而不再是元组:
   
    def parse(line: String) = {
    val pieces = line.split(',')
    val id1 = pieces(0).toInt
    val id2 = pieces(1).toInt
    val scores = pieces.slice(2, 11).map(toDouble)
    val matched = pieces(11).toBoolean
    MatchData(id1, id2, scores, matched)
    }
    val md = parse(line)
    有两点需要注意:1,创建我们的case类实例(Scala讨厌的事情的有一个例子)声明MatchData时前不需要加new关键字。2,MatchData内建了对每个字段toString方法的实现,除了scaores,它使用toString方法获取的是Java双精度数组。
通过名字获取MatchData字段如下:
   
    md.matched
    md.id1
    现在我们可在单个记录上测试我们的解析函数,让我们将它应用到头部数组中的所有元素中,除了第一条之外:
    val mds = head.filter(x => !isHeader(x)).map(x => parse(x))
    起作用了。让我们嗲用map函数将之应用到集群RDD上:
    val parsed = noheader.map(line => parse(line))
    与我们本地生成的mds数组不同,解析函数并没有真正地应用于集群数据。一旦我们调用解析过的RDD就会有输出,解析函数会被应用于转换noheader RDD中每个字符串为MatchData类实例。如果我们再次调用函数解析RDD会生成不同的输出,解析函数会再次作用于输入数据。
这样我们的集群资源不会被充分利用;一旦数据被输入一次,我们想在集群上保存其解析的形式,这样遇到数据问题我们不需要每次都重新分析它。Spark支持这个用例,允许我们告知一个给定的RDD应该被缓存在内存中,通过实例调用cache方法生成。我们对解析好的RDD这样做:
    parsed.cache()

Caching   缓存

RDDS内容默认情况下是暂时的,而Spark提供了一种机制,存留数据在RDD中。第一次action操作需要计算这样一个RDD的内容,然后将计算结果存储在内存或集群磁盘中。下一次的依赖于该RDD的action操作不需要从其依赖重新计算。它的数据直接从缓存分区返回。

cached.cache()
cached.count()
cached.take(10)
Cache的调用表明RDD应存放下一次的计算。调用count最初计算它。action操作将返回前10个元素的RDD作为局部阵列。当被调用时,它访问缓存的元素而不是从依赖重新计算。

Spark定义了不同的机制或者存储级别来留存RDD。rdd.cache()是rdd.persist(StorageLevel.MEMORY)的简写,存储RDD为非序列化Java对象。当Spark估计一个分区不适合在内存中,它将不会保存它,它会重新计算下一次需要的内存大小。当对象被频繁引用和/或需要低延迟访问时,这个级别是最有意义的,因为它避免了任何序列化开销。它的缺点是占用的内存比其他选择要大。同时,许多小对象对Java的垃圾回收造成压力,从而导致停顿和延迟。
Spark也暴露出MEMORY_SER存储级别,分配大字节缓冲区内存和序列化RDD内容融入其中。当使用正确的格式(更多关于下面的内容)时,序列化的数据通常比原始数据的空间少两到五倍。

Spark还可以使用磁盘缓存RDD。MEMORY_AND_DISK和备MEMORY_AND_DISK_SER类似于MEMORY和MEMORY_SER存储级别。对于后者,如果一个分区不适合在内存中,它不会被存储,这意味着必须重新计算其依赖当下一个action操作使用它时。对于前者,Spark将溢出分区到磁盘。
决定何时缓存数据是一门艺术。这个决定通常涉及到空间和速度之间的权衡,垃圾收集的幽灵在头顶上若隐若现,有时会把事情搞得更复杂。总的来说,RDDs应该是当他们有可能被多个action操作引用和再生成本高时缓存数据。



美丽天空 发表于 2017-10-21 10:52:33

感谢分享

hivetodo 发表于 2018-2-27 16:05:40

学习一下

a530491093 发表于 2019-1-17 09:52:04

来过,感谢分享!
页: [1]
查看完整版本: Spark 高级分析:第二章第5,6节