问题导读
1.Spark2入门是哪个类?
2.为何要产生Spark-Session?
3.Spark-Session如何使用?
关注最新经典文章,欢迎关注公众号
看到很多同学还在使用spark1的相关api,这里将讨论Spark-Session。
为何需要Spark-Session
在理解spark-session之前让我们理解入口点,一个入口点是控制从操作系统传递到提供的程序的地方。 在2.0入口之前,spark-core是sparkContext.Apache Spark是一个功能强大的集群计算引擎,因此它专为快速计算大数据而设计。
SaprkContext在Apache Spark:
这是任何Spark驱动程序应用程序生成SparkContext的重要步骤。 它允许spark应用程序在资源管理器的帮助下访问Spark集群。 资源管理器可以是以下三种之一:
- SparkStandalone
- YARN
- Apache Mesos
SparkContext在Apache Spark中的功能:
- 获取spark应用程序的当前状态
- 设置配置
- 访问各种服务
- 取消job
- 取消一个stage
- 关闭清洁
- 注册Spark-Listener
- 可编程动态分配
- 访问持久性RDD
在spark 2.0之前,SparkContext被用作访问所有spark功能的通道。 spark驱动程序使用sparkContext通过资源管理器连接到集群。
SparkConf是创建spark上下文对象所必需的,它存储配置参数,如appName(用于标识spark驱动程序),core的数目和在工作节点上运行的执行程序的内存大小。
为了使用SQL,Hive和Streaming,需要创建单独的上下文。
例:
- val conf = new SparkConf()
- .setMaster("local")
- .setAppName("Spark Practice")
- val sc = new SparkContext(conf)
复制代码
SparkSession - Spark的新入口点
众所周知,在以前的版本中,sparkcontext 是spark的入口点,因为RDD是主要的API,它是使用上下文API创建和操作的。 对于每个其他API,我们需要使用不同的context。
对于流式传输,我们需要streamingContext。 对于SQL sqlContext和hive hiveContext.,因为dataSet和DataFrame API正在成为新的独立API,我们需要为它们构建入口点。 因此在spark 2.0中,我们为DataSet和DataFrame API创建了一个新的入口点构建,称为Spark-Session。
它是SQLContext,HiveContext和未来的streamingContext的组合。 在这些context中可用的所有API都可以在SparkSession上获得,SparkSession也有实际计算的spark context
现在我们看看如何创建Spark Session及如何交互。
- val spark = SparkSession.builder()
- .master("local")
- .appName("example of SparkSession")
- .config("spark.some.config.option", "some-value")
- .getOrCreate()
复制代码
SparkSession.builder()
创建此方法用于构造SparkSession。
master(“local”)
设置要连接的master URL,例如:
“local”在本地运行
“local[4]”以4核在本地运行
“spark://master:7077”在spark独立集群上运行
appName( )
设置将在spark Web UI中显示的应用程序的名称。
如果未设置应用程序名称,则将使用随机生成的名称。
Config
设置使用此方法设置的配置选项会自动传递到'SparkConf'和'SparkSession'自己的配置,它的参数由键值对组成。
GetOrElse
获取现有的SparkSession,或者,如果存在有效的线程本地SparkSession,如果是,则返回该SparkSession。 然后它检查是否存在有效的全局默认SparkSession,如果是,则返回该值。 如果不存在有效的全局SparkSession,则该方法将创建新的SparkSession并将新创建的SparkSession指定为全局默认值。
如果返回现有SparkSession,则此构建器中指定的config选项将应用于现有SparkSession
以上类似于使用local创建SparkContext并创建封装它的SQLContext。 如果需要创建hive context ,可以使用下面的代码创建带有hive支持的spark session :
- val spark = SparkSession.builder()
- .master("local")
- .master("local")
- .appName("example of SparkSession")
- .config("spark.some.config.option", "some-value")
- .enableHiveSupport()
- .getOrCreate()
复制代码
enableHiveSupport启用Hive支持,类似于HiveContext
创建了sparkSession,我们可以用它来读取数据。
使用SparkSession读取数据
SparkSession是读取数据的入口点,类似于旧的SQLContext.read。
以下代码使用SparkSession从CSV读取数据:
- val df = spark.read.format("com.databricks.spark.csv")
- .schema(customSchema)
- .load("data.csv")
复制代码
从Spark 2.0.0开始,最好使用SparkSession,因为它提供了对sparkContext所具有的所有spark功能的访问。 此外,它还提供了用于处理DataFrame和DataSet的API
运行SQL查询
SparkSession可用于对数据执行SQL查询,将结果作为Data-Frame(即数据集[ROW])返回。
- display(spark.sql("Select * from TimeStamp"))
-
-
- +--------------------+-----------+----------+-----+
- | TimeStamp|Temperature| date| Time|
- +--------------------+-----------+----------+-----+
- |2010-02-25T05:42:...| 79.48|2010-02-25|05:42|
- |2010-02-25T05:42:...| 59.27|2010-02-25|05:42|
- |2010-02-25T05:42:...| 97.98|2010-02-25|05:42|
- |2010-02-25T05:42:...| 91.41|2010-02-25|05:42|
- |2010-02-25T05:42:...| 60.67|2010-02-25|05:42|
- |2010-02-25T05:42:...| 61.41|2010-02-25|05:42|
- |2010-02-25T05:42:...| 93.6|2010-02-25|05:42|
- |2010-02-25T05:42:...| 50.32|2010-02-25|05:42|
- |2010-02-25T05:42:...| 64.69|2010-02-25|05:42|
- |2010-02-25T05:42:...| 78.57|2010-02-25|05:42|
- |2010-02-25T05:42:...| 66.89|2010-02-25|05:42|
- |2010-02-25T05:42:...| 62.87|2010-02-25|05:42|
- |2010-02-25T05:42:...| 74.32|2010-02-25|05:42|
- |2010-02-25T05:42:...| 96.55|2010-02-25|05:42|
- |2010-02-25T05:42:...| 71.93|2010-02-25|05:42|
- |2010-02-25T05:42:...| 79.17|2010-02-25|05:42|
- |2010-02-25T05:42:...| 73.89|2010-02-25|05:42|
- |2010-02-25T05:42:...| 80.97|2010-02-25|05:42|
- |2010-02-25T05:42:...| 81.04|2010-02-25|05:42|
- |2010-02-25T05:42:...| 53.05|2010-02-25|05:42|
- +--------------------+-----------+----------+-----+
- only showing top 20 rows
复制代码
使用配置选项
SparkSession还可用于设置运行时配置选项,这些选项可以切换优化器行为或I / O(即Hadoop)行为。
Spark.conf.get(“Spark.Some.config”,” ABCD”)
Spark.conf.get(“Spark.Some.config”)
和配置选项集也可以在SQL中使用变量替换
%Sql select “$ {spark.some.config}”
直接使用元数据
SparkSession还包括一个目录(catalog )方法,其中包含使用Metastore(即数据目录)的方法。 方法返回数据集,以便可以使用相同的数据集API来使用它们。
1.获取当前数据库中的表列表
val tables =spark.catalog.listTables()
display(tables)
- +----+--------+-----------+---------+-----------+
- |name|database|description|tableType|isTemporary|
- +----+--------+-----------+---------+-----------+
- |Stu |default |null |Managed |false |
- +----+--------+-----------+---------+-----------+
复制代码
2.使用dataset API filter 名称
display(tables.filter(_.name contains “son”)))
- +----+--------+-----------+---------+-----------+
- |name|database|description|tableType|isTemporary|
- +----+--------+-----------+---------+-----------+
- |Stu |default |null |Managed |false |
- +----+--------+-----------+---------+-----------+
复制代码
获取表的列列表
display(spark.catalog.listColumns(“smart”))
- +-----+----------+----------+-----------+-------------+--------+
- |name |description|dataType |nullable |isPartitioned|isbucket|
- +-----+-----------+---------+-----------+-------------+--------+
- |email|null |string |true |false |false |
- +-----+-----------+---------+-----------+-------------+--------+
- |iq |null |bigInt |true |false |false |
- +-----+-----------+---------+-----------+-------------+--------+
复制代码
访问底层的SparkContext
SparkSession.sparkContext返回基础sparkContext,用于创建RDD以及管理群集资源。
Spark.sparkContext
- res17: org.apache.spark.SparkContext = org.apache.spark.SparkContext@2debe9ac
复制代码
可以查看以下完整代码:
SparkContext-demo
https://github.com/knoldus/sparkConf-demo
sparkSession-demo
https://github.com/knoldus/sparkSession-demo
|