分享

Spark:你还没用SparkSession吗?已经out了

问题导读

1.Spark2入门是哪个类?
2.为何要产生Spark-Session?
3.Spark-Session如何使用?
关注最新经典文章,欢迎关注公众号


看到很多同学还在使用spark1的相关api,这里将讨论Spark-Session。

为何需要Spark-Session
在理解spark-session之前让我们理解入口点,一个入口点是控制从操作系统传递到提供的程序的地方。 在2.0入口之前,spark-core是sparkContext.Apache Spark是一个功能强大的集群计算引擎,因此它专为快速计算大数据而设计。

SaprkContext在Apache Spark:


spark-context_s-functions-01.jpg

这是任何Spark驱动程序应用程序生成SparkContext的重要步骤。 它允许spark应用程序在资源管理器的帮助下访问Spark集群。 资源管理器可以是以下三种之一:
  • SparkStandalone
  • YARN
  • Apache Mesos

SparkContext在Apache Spark中的功能:
  • 获取spark应用程序的当前状态
  • 设置配置
  • 访问各种服务
  • 取消job
  • 取消一个stage
  • 关闭清洁
  • 注册Spark-Listener
  • 可编程动态分配
  • 访问持久性RDD


在spark 2.0之前,SparkContext被用作访问所有spark功能的通道。 spark驱动程序使用sparkContext通过资源管理器连接到集群。

SparkConf是创建spark上下文对象所必需的,它存储配置参数,如appName(用于标识spark驱动程序),core的数目和在工作节点上运行的执行程序的内存大小。

为了使用SQL,Hive和Streaming,需要创建单独的上下文。

例:

  1. val conf = new SparkConf()
  2.   .setMaster("local")
  3.   .setAppName("Spark Practice")
  4. val sc = new SparkContext(conf)
复制代码


SparkSession - Spark的新入口点


introduction-to-apache-spark-20-12-638.jpg

众所周知,在以前的版本中,sparkcontext 是spark的入口点,因为RDD是主要的API,它是使用上下文API创建和操作的。 对于每个其他API,我们需要使用不同的context。

对于流式传输,我们需要streamingContext。 对于SQL sqlContext和hive hiveContext.,因为dataSet和DataFrame API正在成为新的独立API,我们需要为它们构建入口点。 因此在spark 2.0中,我们为DataSet和DataFrame API创建了一个新的入口点构建,称为Spark-Session。


jumpstart-on-apache-spark-22-on-databricks-40-638.jpg

它是SQLContext,HiveContext和未来的streamingContext的组合。 在这些context中可用的所有API都可以在SparkSession上获得,SparkSession也有实际计算的spark context

spark-sql-sessionstate.png

现在我们看看如何创建Spark Session及如何交互。
  1. val spark = SparkSession.builder()
  2.   .master("local")
  3.   .appName("example of SparkSession")
  4.   .config("spark.some.config.option", "some-value")
  5.   .getOrCreate()
复制代码


SparkSession.builder()

创建此方法用于构造SparkSession。


master(“local”)

设置要连接的master URL,例如:

“local”在本地运行
“local[4]”以4核在本地运行
“spark://master:7077”在spark独立集群上运行


appName( )

设置将在spark Web UI中显示的应用程序的名称。
如果未设置应用程序名称,则将使用随机生成的名称。


Config
设置使用此方法设置的配置选项会自动传递到'SparkConf'和'SparkSession'自己的配置,它的参数由键值对组成。


GetOrElse
获取现有的SparkSession,或者,如果存在有效的线程本地SparkSession,如果是,则返回该SparkSession。 然后它检查是否存在有效的全局默认SparkSession,如果是,则返回该值。 如果不存在有效的全局SparkSession,则该方法将创建新的SparkSession并将新创建的SparkSession指定为全局默认值。

如果返回现有SparkSession,则此构建器中指定的config选项将应用于现有SparkSession

以上类似于使用local创建SparkContext并创建封装它的SQLContext。 如果需要创建hive context ,可以使用下面的代码创建带有hive支持的spark session :
  1. val spark = SparkSession.builder()
  2. .master("local")
  3. .master("local")
  4. .appName("example of SparkSession")
  5. .config("spark.some.config.option", "some-value")
  6. .enableHiveSupport()
  7. .getOrCreate()
复制代码

enableHiveSupport启用Hive支持,类似于HiveContext
创建了sparkSession,我们可以用它来读取数据。


使用SparkSession读取数据

SparkSession是读取数据的入口点,类似于旧的SQLContext.read。
以下代码使用SparkSession从CSV读取数据:

  1. val df = spark.read.format("com.databricks.spark.csv")
  2.                 .schema(customSchema)
  3.                   .load("data.csv")
复制代码

从Spark 2.0.0开始,最好使用SparkSession,因为它提供了对sparkContext所具有的所有spark功能的访问。 此外,它还提供了用于处理DataFrame和DataSet的API

运行SQL查询
SparkSession可用于对数据执行SQL查询,将结果作为Data-Frame(即数据集[ROW])返回。
  1. display(spark.sql("Select * from TimeStamp"))
  2. +--------------------+-----------+----------+-----+
  3. |           TimeStamp|Temperature|      date| Time|
  4. +--------------------+-----------+----------+-----+
  5. |2010-02-25T05:42:...|      79.48|2010-02-25|05:42|
  6. |2010-02-25T05:42:...|      59.27|2010-02-25|05:42|
  7. |2010-02-25T05:42:...|      97.98|2010-02-25|05:42|
  8. |2010-02-25T05:42:...|      91.41|2010-02-25|05:42|
  9. |2010-02-25T05:42:...|      60.67|2010-02-25|05:42|
  10. |2010-02-25T05:42:...|      61.41|2010-02-25|05:42|
  11. |2010-02-25T05:42:...|       93.6|2010-02-25|05:42|
  12. |2010-02-25T05:42:...|      50.32|2010-02-25|05:42|
  13. |2010-02-25T05:42:...|      64.69|2010-02-25|05:42|
  14. |2010-02-25T05:42:...|      78.57|2010-02-25|05:42|
  15. |2010-02-25T05:42:...|      66.89|2010-02-25|05:42|
  16. |2010-02-25T05:42:...|      62.87|2010-02-25|05:42|
  17. |2010-02-25T05:42:...|      74.32|2010-02-25|05:42|
  18. |2010-02-25T05:42:...|      96.55|2010-02-25|05:42|
  19. |2010-02-25T05:42:...|      71.93|2010-02-25|05:42|
  20. |2010-02-25T05:42:...|      79.17|2010-02-25|05:42|
  21. |2010-02-25T05:42:...|      73.89|2010-02-25|05:42|
  22. |2010-02-25T05:42:...|      80.97|2010-02-25|05:42|
  23. |2010-02-25T05:42:...|      81.04|2010-02-25|05:42|
  24. |2010-02-25T05:42:...|      53.05|2010-02-25|05:42|
  25. +--------------------+-----------+----------+-----+
  26. only showing top 20 rows
复制代码


使用配置选项
SparkSession还可用于设置运行时配置选项,这些选项可以切换优化器行为或I / O(即Hadoop)行为。

Spark.conf.get(“Spark.Some.config”,” ABCD”)

Spark.conf.get(“Spark.Some.config”)

和配置选项集也可以在SQL中使用变量替换

%Sql select “$ {spark.some.config}”


直接使用元数据

SparkSession还包括一个目录(catalog )方法,其中包含使用Metastore(即数据目录)的方法。 方法返回数据集,以便可以使用相同的数据集API来使用它们。

1.获取当前数据库中的表列表

val tables =spark.catalog.listTables()
display(tables)

  1. +----+--------+-----------+---------+-----------+
  2. |name|database|description|tableType|isTemporary|
  3. +----+--------+-----------+---------+-----------+
  4. |Stu |default |null       |Managed  |false      |
  5. +----+--------+-----------+---------+-----------+
复制代码


2.使用dataset  API filter 名称
display(tables.filter(_.name contains “son”)))
  1. +----+--------+-----------+---------+-----------+
  2. |name|database|description|tableType|isTemporary|
  3. +----+--------+-----------+---------+-----------+
  4. |Stu |default |null       |Managed  |false      |
  5. +----+--------+-----------+---------+-----------+
复制代码



获取表的列列表
display(spark.catalog.listColumns(“smart”))
  1. +-----+----------+----------+-----------+-------------+--------+
  2. |name |description|dataType |nullable   |isPartitioned|isbucket|
  3. +-----+-----------+---------+-----------+-------------+--------+
  4. |email|null       |string   |true       |false        |false   |
  5. +-----+-----------+---------+-----------+-------------+--------+
  6. |iq   |null       |bigInt   |true       |false        |false   |
  7. +-----+-----------+---------+-----------+-------------+--------+
复制代码



访问底层的SparkContext
SparkSession.sparkContext返回基础sparkContext,用于创建RDD以及管理群集资源。
Spark.sparkContext
  1. res17: org.apache.spark.SparkContext = org.apache.spark.SparkContext@2debe9ac
复制代码



可以查看以下完整代码:

SparkContext-demo
https://github.com/knoldus/sparkConf-demo

sparkSession-demo
https://github.com/knoldus/sparkSession-demo



本帖被以下淘专辑推荐:

已有(3)人评论

跳转到指定楼层
jiangzi 发表于 2018-10-16 00:37:47
应用程序生成SparkContext的重要步骤
回复

使用道具 举报

jiewuzhe02 发表于 2018-10-16 08:24:32
来看看来看看
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条