分享

spark2 sql去哪读取数据

问题导读

1.spark SQL与传统数据库的区别在什么地方?
2.spark SQL独立使用,是否还需要hive?
3.Spark SQL都可以读取哪些数据源?
4.Spark SQL如何实现读取其它数据库?



网上很多spark sql的资料,但是在实际操作中,似乎我们还不能真正理解spark sql。只是一味的模仿,甚至我们已经在使用它,但是依然只看到其中一部分。这里思考几个问题?
spark sql去哪读取数据?
它到底是什么?
如何去理解它?
如何实际操作?
该如何使用它?
怎么样才能看到效果?

spark sql是spark为处理结构化数据而产生的一个模块。同样也是为了适应习惯使用sql的使用者,方便用它操作数据。那这里就有一个问题?既然是sql语句,那么会像关系型数据库一样,去读取表的数据,然后用各种sql操作、组合数据吗?我们知道 mysql自然是读取mysql的表,oracle理所当然去oracle表去读取数据。那么spark sql应该去哪读取数据?spark sql是否也是需要先有表,然后再去读数据?

spark sql去哪读取数据?
如果你阅读过很多spark资料,那么这时候你就会想到spark有多个数据源。比如hive,Json,读取Parquet文件,文本文件,关系数据库等。那么该如何操作读取这些数据源,它有哪些函数?


1.读取hive
spark sql包括sqlContext和hiveContext,hiveContext能读取hive数据库,所以hive是spark很重要的一个数据源,而且很多公司也是这么做的。
说明:
有一个非常重要的内容,就是spark2中SparkSession封装了这两个类.hiveContext在后面的版本将会被弃用,使用 SparkSession.builder.enableHiveSupport来替代。
为了更直观的认识,这里贴下代码[mw_shl_code=scala,true]import java.io.File

import org.apache.spark.sql.Row
import org.apache.spark.sql.SparkSession

case class Record(key: Int, value: String)

// warehouseLocation points to the default location for managed databases and tables
val warehouseLocation = new File("spark-warehouse").getAbsolutePath

val spark = SparkSession
  .builder()
  .appName("Spark Hive Example")
  .config("spark.sql.warehouse.dir", warehouseLocation)
  .enableHiveSupport()
  .getOrCreate()

import spark.implicits._
import spark.sql

sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive")
sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")

// Queries are expressed in HiveQL
sql("SELECT * FROM src").show()
// +---+-------+
// |key|  value|
// +---+-------+
// |238|val_238|
// | 86| val_86|
// |311|val_311|
// ...

// Aggregation queries are also supported.
sql("SELECT COUNT(*) FROM src").show()
// +--------+
// |count(1)|
// +--------+
// |    500 |
// +--------+

// The results of SQL queries are themselves DataFrames and support all normal functions.
val sqlDF = sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key")

// The items in DataFrames are of type Row, which allows you to access each column by ordinal.
val stringsDS = sqlDF.map {
  case Row(key: Int, value: String) => s"Key: $key, Value: $value"
}
stringsDS.show()
// +--------------------+
// |               value|
// +--------------------+
// |Key: 0, Value: val_0|
// |Key: 0, Value: val_0|
// |Key: 0, Value: val_0|
// ...

// You can also use DataFrames to create temporary views within a SparkSession.
val recordsDF = spark.createDataFrame((1 to 100).map(i => Record(i, s"val_$i")))
recordsDF.createOrReplaceTempView("records")

// Queries can then join DataFrame data with data stored in Hive.
sql("SELECT * FROM records r JOIN src s ON r.key = s.key").show()
// +---+------+---+------+
// |key| value|key| value|
// +---+------+---+------+
// |  2| val_2|  2| val_2|
// |  4| val_4|  4| val_4|
// |  5| val_5|  5| val_5|
// ...[/mw_shl_code]
如果要读取hive,还需要做一些配置,并且安装spark的客户端,也必须安装hive,这样spark才能找到依赖。配置文件 hive-site.xml, core-site.xml和hdfs-site.xml复制到conf目录下。如果没有配置 hive-site.xml,context会自动创建metastore_db。如下图

创建数据库.jpg
其实这是spark创建的默认本地Hive metastore。值得说明的是数据库的位置是由spark.sql.warehouse.dir配置属性指定的,默认是在$spark_home/bin目录。spark2.0中 hive-site.xml的hive.metastore.warehouse.dir已经被弃用,使用spark.sql.warehouse.dir来指定数据仓库的位置。
对上面代码做一个简单的说明:与原先文章重复的代码,不在做解释
spark2 sql读取数据源编程学习样例1
http://www.aboutyun.com/forum.php?mod=viewthread&tid=23484


spark2 sql读取数据源编程学习样例2:函数实现详解
http://www.aboutyun.com/forum.php?mod=viewthread&tid=23489

从上面我们看出spark使用的DataFrame.sql嵌入sql语句。对于hive则是导入包[mw_shl_code=scala,true]import spark.sql
[/mw_shl_code]然后通过sql函数直接嵌入sql语句。如下
[mw_shl_code=scala,true]import spark.sql

sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive")
sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")

// Queries are expressed in HiveQL
sql("SELECT * FROM src").show()
// +---+-------+
// |key|  value|
// +---+-------+
// |238|val_238|
// | 86| val_86|
// |311|val_311|
// ...

// Aggregation queries are also supported.
sql("SELECT COUNT(*) FROM src").show()[/mw_shl_code]
########################
我们可能遇到过在开发环境中,我们将hive-site.xml配置文件也复制到开发环境中,可是出现一种情况,在数据库中,只能找到default表,不能找到自己创建的表。这是因为我们并没有真正的连接hive server,而是使用的spark自己默认创建本地hive metastore。如果使用本地hive,我们所创建的表都会在spark的spark-warehouse中找到
1.jpg
所以虽然我们配置文件复制到了环境中,但是并没有起作用,那么怎么才能起作用。我们可以通过配置类,在代码中设置,连接远程hive。



2.读取Json文件

hdfs根目录存储了people.json内容如下:
[mw_shl_code=text,true][
    {
        "name": "aboutyun",
        "age": "4"
    },
    {
        "name": "baidu",
        "age": "5"
    }
][/mw_shl_code]
我们通过
[mw_shl_code=bash,true]spark-shell[/mw_shl_code]命令,进入spark-shell
spark shell.jpg

里面比较重要的信息
Spark context available as 'sc' (master = spark://master:7077, app id = app-20171206194346-0003).
Spark session available as 'spark'.

也就是说spark替我们定义了两个变量,spark,和sc.我们这里使用spark读取文件内容
[mw_shl_code=scala,true]val peopleDF=spark.read.json("/people.json")[/mw_shl_code]
打印schema
[mw_shl_code=scala,true]peopleDF.printSchema[/mw_shl_code]


接着我们来显示数据
[mw_shl_code=scala,true]peopleDF.show[/mw_shl_code]



关于json数据读取更多可参考
spark2 sql读取json文件的格式要求
http://www.aboutyun.com/forum.php?mod=viewthread&tid=23478

3.读取Parquet文件
我们可能没有Parquet格式的文件,这里json格式文件,通过DataFrame转换为parquet文件,然后DataFrame在读取文件。操作如下首先导入包
[mw_shl_code=scala,true]import spark.implicts._[/mw_shl_code]
然后读取json文件,这个文件不了解的可参考
spark2 sql读取json文件的格式要求
http://www.aboutyun.com/forum.php?mod=viewthread&tid=23478
其内容为:

[mw_shl_code=text,true][
    {
        "name": "aboutyun",
        "age": "4"
    },
    {
        "name": "baidu",
        "age": "5"
    }
][/mw_shl_code]
[mw_shl_code=scala,true]val popleDF=spark.read.json("/people.json")[/mw_shl_code]
读取之后,我们通过DataFrame以parquet的格式保存。
[mw_shl_code=scala,true]peopleDF.write.parquet("people.parquet")[/mw_shl_code]
保存之后,我们通过下面read.parquet来读取未DataFrame
[mw_shl_code=scala,true]val parquetFileDF=spark.read.parquet("people.parquet")[/mw_shl_code]
然后注册为临时表
[mw_shl_code=scala,true] parquetFileDF.createOrReplaceTempView("parquetFile")[/mw_shl_code]
剩下的我们就比较熟悉了,比如查询name为aboutyun的记录
[mw_shl_code=scala,true]val nameDF=spark.sql("select * from parquetFile where name='aboutyun'")[/mw_shl_code]
显示如下
[mw_shl_code=scala,true]nameDF.show()[/mw_shl_code]
show.jpg

如果我们想对DataFrame的显示做一个修改让它只显示name,我们可以做如下操作
[mw_shl_code=scala,true]nameDF.map(x=>"name"+x(1)).show()[/mw_shl_code]
showdingz.jpg


3.读取其它数据库
Spark SQL可以使用JDBC读取其它数据库。它优先于JdbcRDD,因为结果会返回DataFrame。使用spark sql处理非常容易,而且可以和其它数据源join。
使用Java和Python使用JDBC非常容易,不需要额外的包。(注意它与 Spark SQL JDBC server不同)。它运行其它应用程序使用Spark SQL查询。
启动的时候,你需要指定 spark classpath的数据库的JDBC 驱动,例如从spark-shell连接postgres,运行下面命令
[mw_shl_code=scala,true]bin/spark-shell --driver-class-path postgresql-9.4.1207.jar --jars postgresql-9.4.1207.jar
[/mw_shl_code]
远程数据库的表可以使用数据源API加载为 DataFrame 或则Spark SQL 临时视图。用户可以指定数据库连接的属性。登陆数据源一般指定user 和password,由数据源options指定。

代码示例:
[mw_shl_code=scala,true]// Note: JDBC loading and saving can be achieved via either the load/save or jdbc methods
// Loading data from a JDBC source
val jdbcDF = spark.read
  .format("jdbc")
  .option("url", "jdbc:postgresql:dbserver")
  .option("dbtable", "schema.tablename")
  .option("user", "username")
  .option("password", "password")
  .load()

val connectionProperties = new Properties()
connectionProperties.put("user", "username")
connectionProperties.put("password", "password")
val jdbcDF2 = spark.read
  .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)

// Saving data to a JDBC source
jdbcDF.write
  .format("jdbc")
  .option("url", "jdbc:postgresql:dbserver")
  .option("dbtable", "schema.tablename")
  .option("user", "username")
  .option("password", "password")
  .save()

jdbcDF2.write
  .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)

// Specifying create table column data types on write
jdbcDF.write
  .option("createTableColumnTypes", "name CHAR(64), comments VARCHAR(1024)")
  .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)[/mw_shl_code]

当然还有其它数据源,更多资料参考
solr作为spark sql的数据源
http://www.aboutyun.com/forum.php?mod=viewthread&tid=21888

通过Spark DataSource API 如何实现Rest数据源
http://www.aboutyun.com/forum.php?mod=viewthread&tid=19291

Spark多数据源计算实践及其在GrowingIO的实践
http://www.aboutyun.com/forum.php?mod=viewthread&tid=18615


相关文章推荐:
spark2 sql读取json文件的格式要求
http://www.aboutyun.com/forum.php?mod=viewthread&tid=23478


spark2 sql读取json文件的格式要求续:如何查询数据
http://www.aboutyun.com/forum.php?mod=viewthread&tid=23483

spark2 sql编程之实现合并Parquet格式的DataFrame的schema
http://www.aboutyun.com/forum.php?mod=viewthread&tid=23518

spark2 sql编程样例:sql操作
http://www.aboutyun.com/forum.php?mod=viewthread&tid=23501

spark2 sql读取数据源编程学习样例1:程序入口、功能等知识详解
http://www.aboutyun.com/forum.php?mod=viewthread&tid=23484


spark2 sql读取数据源编程学习样例2:函数实现详解
http://www.aboutyun.com/forum.php?mod=viewthread&tid=23489


spark2 sql编程之实现合并Parquet格式的DataFrame的schema
http://www.aboutyun.com/forum.php?mod=viewthread&tid=23518

spark2 sql编程之实现合并Parquet格式的DataFrame的schema
http://www.aboutyun.com/forum.php?mod=viewthread&tid=23518





没找到任何评论,期待你打破沉寂

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条