spark core组件:RDD、DataFrame和DataSet介绍、场景与比较

查看数: 43718 | 评论数: 6 | 收藏 11
关灯 | 提示:支持键盘翻页<-左 右->
    组图打开中,请稍候......
发布时间: 2017-2-3 20:20

正文摘要:

问题导读 1.rdd,dataframe,dataset在哪个版本被引入? 2.什么情况下使用rdd,dataframe,dataset? 3.它们有什么不同? spark生态系统中,Spark Core,包括各种Spark的各种核心组件,它们能够对内存和 ...

回复

yuwenge 发表于 2017-2-4 21:03:03
Spark Core提供Spark最基础与最核心的功能,主要包括以下功能:
(1)SparkContext:通常而言,Driver Application的执行与输出都是通过SparkContext来完成的。在正式提交Application之前,首先需要初始化SparkContext。SparkContext隐藏了网络通信、分布式部署、消息通信、存储能力、计算能力、缓存、测量系统、文件服务、Web服务等内容,应用程序开发者只需要使用SparkContext提供的API完成功能开发。SparkContext内置的DAGScheduler负责创建Job,将DAG中的RDD划分到不同的Stage,提交Stage等功能。内置的TaskScheduler负责资源的申请,任务的提交及请求集群对任务的调度等工作。
  (2)存储体系:Spark优先考虑使用各节点的内存作为存储,当内存不足时才会考虑使用磁盘,这极大地减少了磁盘IO,提升了任务执行的效率,使得Spark适用于实时计算、流式计算等场景。此外,Spark还提供了以内存为中心的高容错的分布式文件系统Tachyon供用户进行选择。Tachyon能够为Spark提供可靠的内存级的文件共享服务。
  (3)计算引擎:计算引擎由SparkContext中的DAGScheduler、RDD以及具体节点上的Executor负责执行的Map和Reduce任务组成。DAGScheduler和RDD虽然位于SparkContext内部,但是在任务正式提交与执行之前会将Job中的RDD组织成有向无环图(DAG),并对Stage进行划分,决定了任务执行阶段任务的数量、迭代计算、shuffle等过程。
  (4)部署模式:由于单节点不足以提供足够的存储和计算能力,所以作为大数据处理的Spark在SparkContext的TaskScheduler组件中提供了对Standalone部署模式的实现和Yarn、Mesos等分布式资源管理系统的支持。通过使用Standalone、Yarn、Mesos等部署模式为Task分配计算资源,提高任务的并发执行效率。
引自:wanglg_perfect

aboutnuy 发表于 2017-2-23 09:03:25
需要学习一下
ggggying12 发表于 2017-2-21 20:52:25
学习了。。。。
ggggying12 发表于 2017-2-21 17:28:23
学习了
regan 发表于 2017-2-6 11:31:10
谢谢谢分享
yuwenge 发表于 2017-2-4 17:46:12
进一步补充

RDD
优点:
  • 编译时类型安全
    编译时就能检查出类型错误
  • 面向对象的编程风格
    直接通过类名点的方式来操作数据
缺点:
  • 序列化和反序列化的性能开销
    无论是集群间的通信, 还是IO操作都需要对对象的结构和数据进行序列化和反序列化.
  • GC的性能开销
    频繁的创建和销毁对象, 势必会增加GC
[mw_shl_code=scala,true]import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}

object Run {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("test").setMaster("local")
    val sc = new SparkContext(conf)
    sc.setLogLevel("WARN")
    val sqlContext = new SQLContext(sc)

    /**
      * id      age
      * 1       30
      * 2       29
      * 3       21
      */
    case class Person(id: Int, age: Int)
    val idAgeRDDPerson = sc.parallelize(Array(Person(1, 30), Person(2, 29), Person(3, 21)))

    // 优点1
    // idAge.filter(_.age > "") // 编译时报错, int不能跟String比

    // 优点2
    idAgeRDDPerson.filter(_.age > 25) // 直接操作一个个的person对象
  }
}[/mw_shl_code]



DataFrame
DataFrame引入了schema和off-heap
  • schema : RDD每一行的数据, 结构都是一样的. 这个结构就存储在schema中. Spark通过schame就能够读懂数据, 因此在通信和IO时就只需要序列化和反序列化数据, 而结构的部分就可以省略了.
  • off-heap : 意味着JVM堆以外的内存, 这些内存直接受操作系统管理(而不是JVM)。Spark能够以二进制的形式序列化数据(不包括结构)到off-heap中, 当要操作数据时, 就直接操作off-heap内存. 由于Spark理解schema, 所以知道该如何操作.

off-heap就像地盘, schema就像地图, Spark有地图又有自己地盘了, 就可以自己说了算了, 不再受JVM的限制, 也就不再收GC的困扰了.
通过schema和off-heap, DataFrame解决了RDD的缺点, 但是却丢了RDD的优点. DataFrame不是类型安全的, API也不是面向对象风格的.

[mw_shl_code=scala,true]import org.apache.spark.sql.types.{DataTypes, StructField, StructType}
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}

object Run {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("test").setMaster("local")
    val sc = new SparkContext(conf)
    sc.setLogLevel("WARN")
    val sqlContext = new SQLContext(sc)
    /**
      * id      age
      * 1       30
      * 2       29
      * 3       21
      */
    val idAgeRDDRow = sc.parallelize(Array(Row(1, 30), Row(2, 29), Row(4, 21)))

    val schema = StructType(Array(StructField("id", DataTypes.IntegerType), StructField("age", DataTypes.IntegerType)))

    val idAgeDF = sqlContext.createDataFrame(idAgeRDDRow, schema)
    // API不是面向对象的
    idAgeDF.filter(idAgeDF.col("age") > 25)
    // 不会报错, DataFrame不是编译时类型安全的
    idAgeDF.filter(idAgeDF.col("age") > "")
  }
}[/mw_shl_code]


DataSet
DataSet结合了RDD和DataFrame的优点, 并带来的一个新的概念Encoder
当序列化数据时, Encoder产生字节码与off-heap进行交互, 能够达到按需访问数据的效果, 而不用反序列化整个对象. Spark还没有提供自定义Encoder的API, 但是未来会加入.
下面看DataFrame和DataSet在2.0.0-preview中的实现
下面这段代码, 在1.6.x中创建的是DataFrame

[mw_shl_code=scala,true]// 上文DataFrame示例中提取出来的
val idAgeRDDRow = sc.parallelize(Array(Row(1, 30), Row(2, 29), Row(4, 21)))

val schema = StructType(Array(StructField("id", DataTypes.IntegerType), StructField("age", DataTypes.IntegerType)))

val idAgeDF = sqlContext.createDataFrame(idAgeRDDRow, schema)[/mw_shl_code]

但是同样的代码在2.0.0-preview中, 创建的虽然还叫DataFrame
[mw_shl_code=scala,true]// sqlContext.createDataFrame(idAgeRDDRow, schema) 方法的实现, 返回值依然是DataFrame
def createDataFrame(rowRDD: RDD[Row], schema: StructType): DataFrame = {
sparkSession.createDataFrame(rowRDD, schema)
}[/mw_shl_code]


但是其实却是DataSet, 因为DataFrame被声明为Dataset[Row]
[mw_shl_code=scala,true]package object sql {
  // ...省略了不相关的代码

  type DataFrame = Dataset[Row]
}[/mw_shl_code]


因此当我们从1.6.x迁移到2.0.0的时候, 无需任何修改就直接用上了DataSet.

下面是一段DataSet的示例代码

[mw_shl_code=scala,true]import org.apache.spark.sql.types.{DataTypes, StructField, StructType}
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}

object Test {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("test").setMaster("local") // 调试的时候一定不要用local

  •     val sc = new SparkContext(conf)
        val sqlContext = new SQLContext(sc)
        import sqlContext.implicits._

        val idAgeRDDRow = sc.parallelize(Array(Row(1, 30), Row(2, 29), Row(4, 21)))

        val schema = StructType(Array(StructField("id", DataTypes.IntegerType), StructField("age", DataTypes.IntegerType)))

        // 在2.0.0-preview中这行代码创建出的DataFrame, 其实是DataSet[Row]
        val idAgeDS = sqlContext.createDataFrame(idAgeRDDRow, schema)

        // 在2.0.0-preview中, 还不支持自定的Encoder, Row类型不行, 自定义的bean也不行
        // 官方文档也有写通过bean创建Dataset的例子,但是我运行时并不能成功
        // 所以目前需要用创建DataFrame的方法, 来创建DataSet[Row]
        // sqlContext.createDataset(idAgeRDDRow)

        // 目前支持String, Integer, Long等类型直接创建Dataset
        Seq(1, 2, 3).toDS().show()
        sqlContext.createDataset(sc.parallelize(Array(1, 2, 3))).show()
      }
    }[/mw_shl_code]



    出处:小帆的帆的专栏


  • 关闭

    推荐上一条 /2 下一条