Spark Core提供Spark最基础与最核心的功能,主要包括以下功能: (1)SparkContext:通常而言,Driver Application的执行与输出都是通过SparkContext来完成的。在正式提交Application之前,首先需要初始化SparkContext。SparkContext隐藏了网络通信、分布式部署、消息通信、存储能力、计算能力、缓存、测量系统、文件服务、Web服务等内容,应用程序开发者只需要使用SparkContext提供的API完成功能开发。SparkContext内置的DAGScheduler负责创建Job,将DAG中的RDD划分到不同的Stage,提交Stage等功能。内置的TaskScheduler负责资源的申请,任务的提交及请求集群对任务的调度等工作。 (2)存储体系:Spark优先考虑使用各节点的内存作为存储,当内存不足时才会考虑使用磁盘,这极大地减少了磁盘IO,提升了任务执行的效率,使得Spark适用于实时计算、流式计算等场景。此外,Spark还提供了以内存为中心的高容错的分布式文件系统Tachyon供用户进行选择。Tachyon能够为Spark提供可靠的内存级的文件共享服务。 (3)计算引擎:计算引擎由SparkContext中的DAGScheduler、RDD以及具体节点上的Executor负责执行的Map和Reduce任务组成。DAGScheduler和RDD虽然位于SparkContext内部,但是在任务正式提交与执行之前会将Job中的RDD组织成有向无环图(DAG),并对Stage进行划分,决定了任务执行阶段任务的数量、迭代计算、shuffle等过程。 (4)部署模式:由于单节点不足以提供足够的存储和计算能力,所以作为大数据处理的Spark在SparkContext的TaskScheduler组件中提供了对Standalone部署模式的实现和Yarn、Mesos等分布式资源管理系统的支持。通过使用Standalone、Yarn、Mesos等部署模式为Task分配计算资源,提高任务的并发执行效率。 引自:wanglg_perfect |
需要学习一下 |
学习了。。。。 |
学习了 |
谢谢谢分享 |
进一步补充 RDD 优点:
缺点:
import org.apache.spark.{SparkConf, SparkContext} object Run { def main(args: Array[String]) { val conf = new SparkConf().setAppName("test").setMaster("local") val sc = new SparkContext(conf) sc.setLogLevel("WARN") val sqlContext = new SQLContext(sc) /** * id age * 1 30 * 2 29 * 3 21 */ case class Person(id: Int, age: Int) val idAgeRDDPerson = sc.parallelize(Array(Person(1, 30), Person(2, 29), Person(3, 21))) // 优点1 // idAge.filter(_.age > "") // 编译时报错, int不能跟String比 // 优点2 idAgeRDDPerson.filter(_.age > 25) // 直接操作一个个的person对象 } }[/mw_shl_code] DataFrame DataFrame引入了schema和off-heap
off-heap就像地盘, schema就像地图, Spark有地图又有自己地盘了, 就可以自己说了算了, 不再受JVM的限制, 也就不再收GC的困扰了. 通过schema和off-heap, DataFrame解决了RDD的缺点, 但是却丢了RDD的优点. DataFrame不是类型安全的, API也不是面向对象风格的. [mw_shl_code=scala,true]import org.apache.spark.sql.types.{DataTypes, StructField, StructType} import org.apache.spark.sql.{Row, SQLContext} import org.apache.spark.{SparkConf, SparkContext} object Run { def main(args: Array[String]) { val conf = new SparkConf().setAppName("test").setMaster("local") val sc = new SparkContext(conf) sc.setLogLevel("WARN") val sqlContext = new SQLContext(sc) /** * id age * 1 30 * 2 29 * 3 21 */ val idAgeRDDRow = sc.parallelize(Array(Row(1, 30), Row(2, 29), Row(4, 21))) val schema = StructType(Array(StructField("id", DataTypes.IntegerType), StructField("age", DataTypes.IntegerType))) val idAgeDF = sqlContext.createDataFrame(idAgeRDDRow, schema) // API不是面向对象的 idAgeDF.filter(idAgeDF.col("age") > 25) // 不会报错, DataFrame不是编译时类型安全的 idAgeDF.filter(idAgeDF.col("age") > "") } }[/mw_shl_code] DataSet DataSet结合了RDD和DataFrame的优点, 并带来的一个新的概念Encoder 当序列化数据时, Encoder产生字节码与off-heap进行交互, 能够达到按需访问数据的效果, 而不用反序列化整个对象. Spark还没有提供自定义Encoder的API, 但是未来会加入. 下面看DataFrame和DataSet在2.0.0-preview中的实现 下面这段代码, 在1.6.x中创建的是DataFrame [mw_shl_code=scala,true]// 上文DataFrame示例中提取出来的 val idAgeRDDRow = sc.parallelize(Array(Row(1, 30), Row(2, 29), Row(4, 21))) val schema = StructType(Array(StructField("id", DataTypes.IntegerType), StructField("age", DataTypes.IntegerType))) val idAgeDF = sqlContext.createDataFrame(idAgeRDDRow, schema)[/mw_shl_code] 但是同样的代码在2.0.0-preview中, 创建的虽然还叫DataFrame [mw_shl_code=scala,true]// sqlContext.createDataFrame(idAgeRDDRow, schema) 方法的实现, 返回值依然是DataFrame def createDataFrame(rowRDD: RDD[Row], schema: StructType): DataFrame = { sparkSession.createDataFrame(rowRDD, schema) }[/mw_shl_code] 但是其实却是DataSet, 因为DataFrame被声明为Dataset[Row] [mw_shl_code=scala,true]package object sql { // ...省略了不相关的代码 type DataFrame = Dataset[Row] }[/mw_shl_code] 因此当我们从1.6.x迁移到2.0.0的时候, 无需任何修改就直接用上了DataSet. 下面是一段DataSet的示例代码 [mw_shl_code=scala,true]import org.apache.spark.sql.types.{DataTypes, StructField, StructType} import org.apache.spark.sql.{Row, SQLContext} import org.apache.spark.{SparkConf, SparkContext} object Test { def main(args: Array[String]) { val conf = new SparkConf().setAppName("test").setMaster("local") // 调试的时候一定不要用local val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) import sqlContext.implicits._ val idAgeRDDRow = sc.parallelize(Array(Row(1, 30), Row(2, 29), Row(4, 21))) val schema = StructType(Array(StructField("id", DataTypes.IntegerType), StructField("age", DataTypes.IntegerType))) // 在2.0.0-preview中这行代码创建出的DataFrame, 其实是DataSet[Row] val idAgeDS = sqlContext.createDataFrame(idAgeRDDRow, schema) // 在2.0.0-preview中, 还不支持自定的Encoder, Row类型不行, 自定义的bean也不行 // 官方文档也有写通过bean创建Dataset的例子,但是我运行时并不能成功 // 所以目前需要用创建DataFrame的方法, 来创建DataSet[Row] // sqlContext.createDataset(idAgeRDDRow) // 目前支持String, Integer, Long等类型直接创建Dataset Seq(1, 2, 3).toDS().show() sqlContext.createDataset(sc.parallelize(Array(1, 2, 3))).show() } }[/mw_shl_code] 出处:小帆的帆的专栏 |