问题导读
1.DataFrame中本文使用了row哪些方法?
2.操作DataFrame row需要导入什么包?
3.teenagersDF.map(teenager => "Name: " + teenager(0)).show()中teenager 是什么trait?
前两篇文章
spark2 sql读取数据源编程学习样例1:程序入口、功能等知识详解
http://www.aboutyun.com/forum.php?mod=viewthread&tid=23484
spark2 sql读取数据源编程学习样例2:函数实现详解
http://www.aboutyun.com/forum.php?mod=viewthread&tid=23489
写了关于spark sql如何读取数据源,下面说下spark sql的各种操作。
如果你想一个spark sql程序,那么你会想,你到底该使用哪个包,如何嵌入sql语句,如何创建表,如何显示表内容,如何指定表显示字段。下面解决了我们这些问题。同样我们先附上代码:
代码
[mw_shl_code=scala,true]/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.examples.sql
import org.apache.spark.sql.Row
// $example on:init_session$
import org.apache.spark.sql.SparkSession
// $example off:init_session$
// $example on:programmatic_schema$
// $example on:data_types$
import org.apache.spark.sql.types._
// $example off:data_types$
// $example off:programmatic_schema$
object SparkSQLExample {
// $example on:create_ds$
case class Person(name: String, age: Long)
// $example off:create_ds$
def main(args: Array[String]) {
// $example on:init_session$
val spark = SparkSession
.builder()
.appName("Spark SQL basic example")
.config("spark.some.config.option", "some-value")
.getOrCreate()
// For implicit conversions like converting RDDs to DataFrames
import spark.implicits._
// $example off:init_session$
runBasicDataFrameExample(spark)
runDatasetCreationExample(spark)
runInferSchemaExample(spark)
runProgrammaticSchemaExample(spark)
spark.stop()
}
private def runBasicDataFrameExample(spark: SparkSession): Unit = {
// $example on:create_df$
val df = spark.read.json("examples/src/main/resources/people.json")
// Displays the content of the DataFrame to stdout
df.show()
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +----+-------+
// $example off:create_df$
// $example on:untyped_ops$
// This import is needed to use the $-notation
import spark.implicits._
// Print the schema in a tree format
df.printSchema()
// root
// |-- age: long (nullable = true)
// |-- name: string (nullable = true)
// Select only the "name" column
df.select("name").show()
// +-------+
// | name|
// +-------+
// |Michael|
// | Andy|
// | Justin|
// +-------+
// Select everybody, but increment the age by 1
df.select($"name", $"age" + 1).show()
// +-------+---------+
// | name|(age + 1)|
// +-------+---------+
// |Michael| null|
// | Andy| 31|
// | Justin| 20|
// +-------+---------+
// Select people older than 21
df.filter($"age" > 21).show()
// +---+----+
// |age|name|
// +---+----+
// | 30|Andy|
// +---+----+
// Count people by age
df.groupBy("age").count().show()
// +----+-----+
// | age|count|
// +----+-----+
// | 19| 1|
// |null| 1|
// | 30| 1|
// +----+-----+
// $example off:untyped_ops$
// $example on:run_sql$
// Register the DataFrame as a SQL temporary view
df.createOrReplaceTempView("people")
val sqlDF = spark.sql("SELECT * FROM people")
sqlDF.show()
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +----+-------+
// $example off:run_sql$
// $example on:global_temp_view$
// Register the DataFrame as a global temporary view
df.createGlobalTempView("people")
// Global temporary view is tied to a system preserved database `global_temp`
spark.sql("SELECT * FROM global_temp.people").show()
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +----+-------+
// Global temporary view is cross-session
spark.newSession().sql("SELECT * FROM global_temp.people").show()
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +----+-------+
// $example off:global_temp_view$
}
private def runDatasetCreationExample(spark: SparkSession): Unit = {
import spark.implicits._
// $example on:create_ds$
// Encoders are created for case classes
val caseClassDS = Seq(Person("Andy", 32)).toDS()
caseClassDS.show()
// +----+---+
// |name|age|
// +----+---+
// |Andy| 32|
// +----+---+
// Encoders for most common types are automatically provided by importing spark.implicits._
val primitiveDS = Seq(1, 2, 3).toDS()
primitiveDS.map(_ + 1).collect() // Returns: Array(2, 3, 4)
// DataFrames can be converted to a Dataset by providing a class. Mapping will be done by name
val path = "examples/src/main/resources/people.json"
val peopleDS = spark.read.json(path).as[Person]
peopleDS.show()
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +----+-------+
// $example off:create_ds$
}
private def runInferSchemaExample(spark: SparkSession): Unit = {
// $example on:schema_inferring$
// For implicit conversions from RDDs to DataFrames
import spark.implicits._
// Create an RDD of Person objects from a text file, convert it to a Dataframe
val peopleDF = spark.sparkContext
.textFile("examples/src/main/resources/people.txt")
.map(_.split(","))
.map(attributes => Person(attributes(0), attributes(1).trim.toInt))
.toDF()
// Register the DataFrame as a temporary view
peopleDF.createOrReplaceTempView("people")
// SQL statements can be run by using the sql methods provided by Spark
val teenagersDF = spark.sql("SELECT name, age FROM people WHERE age BETWEEN 13 AND 19")
// The columns of a row in the result can be accessed by field index
teenagersDF.map(teenager => "Name: " + teenager(0)).show()
// +------------+
// | value|
// +------------+
// |Name: Justin|
// +------------+
// or by field name
teenagersDF.map(teenager => "Name: " + teenager.getAs[String]("name")).show()
// +------------+
// | value|
// +------------+
// |Name: Justin|
// +------------+
// No pre-defined encoders for Dataset[Map[K,V]], define explicitly
implicit val mapEncoder = org.apache.spark.sql.Encoders.kryo[Map[String, Any]]
// Primitive types and case classes can be also defined as
// implicit val stringIntMapEncoder: Encoder[Map[String, Any]] = ExpressionEncoder()
// row.getValuesMap[T] retrieves multiple columns at once into a Map[String, T]
teenagersDF.map(teenager => teenager.getValuesMap[Any](List("name", "age"))).collect()
// Array(Map("name" -> "Justin", "age" -> 19))
// $example off:schema_inferring$
}
private def runProgrammaticSchemaExample(spark: SparkSession): Unit = {
import spark.implicits._
// $example on:programmatic_schema$
// Create an RDD
val peopleRDD = spark.sparkContext.textFile("examples/src/main/resources/people.txt")
// The schema is encoded in a string
val schemaString = "name age"
// Generate the schema based on the string of schema
val fields = schemaString.split(" ")
.map(fieldName => StructField(fieldName, StringType, nullable = true))
val schema = StructType(fields)
// Convert records of the RDD (people) to Rows
val rowRDD = peopleRDD
.map(_.split(","))
.map(attributes => Row(attributes(0), attributes(1).trim))
// Apply the schema to the RDD
val peopleDF = spark.createDataFrame(rowRDD, schema)
// Creates a temporary view using the DataFrame
peopleDF.createOrReplaceTempView("people")
// SQL can be run over a temporary view created using DataFrames
val results = spark.sql("SELECT name FROM people")
// The results of SQL queries are DataFrames and support all the normal RDD operations
// The columns of a row in the result can be accessed by field index or by field name
results.map(attributes => "Name: " + attributes(0)).show()
// +-------------+
// | value|
// +-------------+
// |Name: Michael|
// | Name: Andy|
// | Name: Justin|
// +-------------+
// $example off:programmatic_schema$
}
}[/mw_shl_code]接着我们阅读代码:
包名
[mw_shl_code=scala,true]package org.apache.spark.examples.sql
[/mw_shl_code]
同样还是先有一个自定义的包名org.apache.spark.examples.sql
导入包
[mw_shl_code=scala,true]import org.apache.spark.sql.Row
// $example on:init_session$
import org.apache.spark.sql.SparkSession
// $example off:init_session$
// $example on:programmatic_schema$
// $example on:data_types$
import org.apache.spark.sql.types._
// $example off:data_types$
// $example off:programmatic_schema$[/mw_shl_code]
上面我们导入了三个包,其中import org.apache.spark.sql.Row,import org.apache.spark.sql.types._是我们在读取数据源没有用到的。
[mw_shl_code=scala,true] case class Person(name: String, age: Long)
[/mw_shl_code]
这里是自定义了一个类,跟普通类稍微有一些区别。
详细参考:
scala中case class是什么?http://www.aboutyun.com/forum.php?mod=viewthread&tid=23464
程序入口
[mw_shl_code=scala,true]def main(args: Array[String]) {
// $example on:init_session$
val spark = SparkSession
.builder()
.appName("Spark SQL basic example")
.config("spark.some.config.option", "some-value")
.getOrCreate()
// For implicit conversions like converting RDDs to DataFrames
import spark.implicits._
// $example off:init_session$
runBasicDataFrameExample(spark)
runDatasetCreationExample(spark)
runInferSchemaExample(spark)
runProgrammaticSchemaExample(spark)
spark.stop()
}[/mw_shl_code]
上面跟spark读取数据源是一样的,不在重复,想了解可查看
spark2 sql读取数据源编程学习样例1:程序入口、功能等知识详解
http://www.aboutyun.com/forum.php?mod=viewthread&tid=23484
runBasicDataFrameExample函数
[mw_shl_code=scala,true]private def runBasicDataFrameExample(spark: SparkSession): Unit = {
// $example on:create_df$
val df = spark.read.json("examples/src/main/resources/people.json")
// Displays the content of the DataFrame to stdout
df.show()
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +----+-------+
// $example off:create_df$
// $example on:untyped_ops$
// This import is needed to use the $-notation
import spark.implicits._
// Print the schema in a tree format
df.printSchema()
// root
// |-- age: long (nullable = true)
// |-- name: string (nullable = true)
// Select only the "name" column
df.select("name").show()
// +-------+
// | name|
// +-------+
// |Michael|
// | Andy|
// | Justin|
// +-------+
// Select everybody, but increment the age by 1
df.select($"name", $"age" + 1).show()
// +-------+---------+
// | name|(age + 1)|
// +-------+---------+
// |Michael| null|
// | Andy| 31|
// | Justin| 20|
// +-------+---------+
// Select people older than 21
df.filter($"age" > 21).show()
// +---+----+
// |age|name|
// +---+----+
// | 30|Andy|
// +---+----+
// Count people by age
df.groupBy("age").count().show()
// +----+-----+
// | age|count|
// +----+-----+
// | 19| 1|
// |null| 1|
// | 30| 1|
// +----+-----+
// $example off:untyped_ops$
// $example on:run_sql$
// Register the DataFrame as a SQL temporary view
df.createOrReplaceTempView("people")
val sqlDF = spark.sql("SELECT * FROM people")
sqlDF.show()
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +----+-------+
// $example off:run_sql$
// $example on:global_temp_view$
// Register the DataFrame as a global temporary view
df.createGlobalTempView("people")
// Global temporary view is tied to a system preserved database `global_temp`
spark.sql("SELECT * FROM global_temp.people").show()
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +----+-------+
// Global temporary view is cross-session
spark.newSession().sql("SELECT * FROM global_temp.people").show()
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +----+-------+
// $example off:global_temp_view$
}[/mw_shl_code]
这里面其实也有重复的,也不全部介绍了。其中
[mw_shl_code=scala,true] df.select("name").show()
[/mw_shl_code]
是一直显示自定字段name的列表,如下:
[mw_shl_code=scala,true] // +-------+
// | name|
// +-------+
// |Michael|
// | Andy|
// | Justin|
// +-------+[/mw_shl_code]
[mw_shl_code=scala,true] df.select($"name", $"age" + 1).show()
[/mw_shl_code]
上面我们还可以对字段操作,将字段的age都加1,并显示,如下:
[mw_shl_code=scala,true] // +-------+---------+
// | name|(age + 1)|
// +-------+---------+
// |Michael| null|
// | Andy| 31|
// | Justin| 20|
// +-------+---------+[/mw_shl_code]
从这里说明Scala已经变的跟传统数据库差不多了。
[mw_shl_code=scala,true] df.filter($"age" > 21).show()
// +---+----+
// |age|name|
// +---+----+
// | 30|Andy|
// +---+----+[/mw_shl_code]
上面查询是年龄大于21岁的记录。
[mw_shl_code=scala,true] // Count people by age
df.groupBy("age").count().show()
// +----+-----+
// | age|count|
// +----+-----+
// | 19| 1|
// |null| 1|
// | 30| 1|
// +----+-----+[/mw_shl_code]
上面则是分组,并统计。
[mw_shl_code=scala,true] // Register the DataFrame as a SQL temporary view
df.createOrReplaceTempView("people")
val sqlDF = spark.sql("SELECT * FROM people")
sqlDF.show()
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +----+-------+[/mw_shl_code]
上面是注册一个sql临时视图,然后查询显示
[mw_shl_code=scala,true]// Register the DataFrame as a global temporary view
df.createGlobalTempView("people")
// Global temporary view is tied to a system preserved database `global_temp`
spark.sql("SELECT * FROM global_temp.people").show()
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +----+-------+[/mw_shl_code]
我们看到这里有些区别带有global 。一个是people视图,一个是global_temp.people视图。
[mw_shl_code=scala,true] spark.newSession().sql("SELECT * FROM global_temp.people").show() // +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +----+-------+
[/mw_shl_code]
Global 临时视图是cross-session,也就是可能是不止在一个SparkSession中显示。当然这个后面在验证下。大家感兴趣也可以测试下。
runDatasetCreationExample函数
[mw_shl_code=scala,true]private def runDatasetCreationExample(spark: SparkSession): Unit = {
import spark.implicits._
// $example on:create_ds$
// Encoders are created for case classes
val caseClassDS = Seq(Person("Andy", 32)).toDS()
caseClassDS.show()
// +----+---+
// |name|age|
// +----+---+
// |Andy| 32|
// +----+---+
// Encoders for most common types are automatically provided by importing spark.implicits._
val primitiveDS = Seq(1, 2, 3).toDS()
primitiveDS.map(_ + 1).collect() // Returns: Array(2, 3, 4)
// DataFrames can be converted to a Dataset by providing a class. Mapping will be done by name
val path = "examples/src/main/resources/people.json"
val peopleDS = spark.read.json(path).as[Person]
peopleDS.show()
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +----+-------+
// $example off:create_ds$
}[/mw_shl_code]
整体来说实现了由集合分别转换为DataFrame和dataset。
[mw_shl_code=scala,true] val caseClassDS = Seq(Person("Andy", 32)).toDS()
[/mw_shl_code]
上面是person类转换为序列,然后序列转换为DataFrame。
[mw_shl_code=scala,true] val primitiveDS = Seq(1, 2, 3).toDS()
primitiveDS.map(_ + 1).collect() // Returns: Array(2, 3, 4)[/mw_shl_code]
从上面可以看出集合可以转换为dataset,然后通过map映射分别都加1,然后通过collect函数显示。
[mw_shl_code=scala,true]val path = "examples/src/main/resources/people.json"
val peopleDS = spark.read.json(path).as[Person]
peopleDS.show()
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +----+-------+[/mw_shl_code]
上面其实很有意思
[mw_shl_code=scala,true]spark.read.json(path)[/mw_shl_code]这里其实为DataFrame,但是通过[mw_shl_code=scala,true]as[Person][/mw_shl_code]转换为了dataset,person则为case类。
runInferSchemaExample函数
[mw_shl_code=scala,true]private def runInferSchemaExample(spark: SparkSession): Unit = {
// $example on:schema_inferring$
// For implicit conversions from RDDs to DataFrames
import spark.implicits._
// Create an RDD of Person objects from a text file, convert it to a Dataframe
val peopleDF = spark.sparkContext
.textFile("examples/src/main/resources/people.txt")
.map(_.split(","))
.map(attributes => Person(attributes(0), attributes(1).trim.toInt))
.toDF()
// Register the DataFrame as a temporary view
peopleDF.createOrReplaceTempView("people")
// SQL statements can be run by using the sql methods provided by Spark
val teenagersDF = spark.sql("SELECT name, age FROM people WHERE age BETWEEN 13 AND 19")
// The columns of a row in the result can be accessed by field index
teenagersDF.map(teenager => "Name: " + teenager(0)).show()
// +------------+
// | value|
// +------------+
// |Name: Justin|
// +------------+
// or by field name
teenagersDF.map(teenager => "Name: " + teenager.getAs[String]("name")).show()
// +------------+
// | value|
// +------------+
// |Name: Justin|
// +------------+
// No pre-defined encoders for Dataset[Map[K,V]], define explicitly
implicit val mapEncoder = org.apache.spark.sql.Encoders.kryo[Map[String, Any]]
// Primitive types and case classes can be also defined as
// implicit val stringIntMapEncoder: Encoder[Map[String, Any]] = ExpressionEncoder()
// row.getValuesMap[T] retrieves multiple columns at once into a Map[String, T]
teenagersDF.map(teenager => teenager.getValuesMap[Any](List("name", "age"))).collect()
// Array(Map("name" -> "Justin", "age" -> 19))
// $example off:schema_inferring$
}
[/mw_shl_code]上面分别是rdd转换为DataFrame,以及DataFrame行的操作
[mw_shl_code=scala,true]val peopleDF = spark.sparkContext
.textFile("examples/src/main/resources/people.txt")
.map(_.split(","))
.map(attributes => Person(attributes(0), attributes(1).trim.toInt))
.toDF()[/mw_shl_code]
上面代码便是读取people.txt为rdd,然后转换为DataFrame。这里面大部分也重复了。需要说明的
[mw_shl_code=scala,true] implicit val mapEncoder = org.apache.spark.sql.Encoders.kryo[Map[String, Any]]
[/mw_shl_code]
上面是map的序列化类mapEncoder 。
[mw_shl_code=scala,true] teenagersDF.map(teenager => "Name: " + teenager.getAs[String]("name")).show()
// +------------+
// | value|
// +------------+
// |Name: Justin|
// +------------+
[/mw_shl_code]
这里面值得说的getAs方法,它是DataFrame的row的方法,返回的是name字段的值
官网解释如下
[mw_shl_code=scala,true] teenagersDF.map(teenager => teenager.getValuesMap[Any](List("name", "age"))).collect()
[/mw_shl_code]
teenager.getValuesMap[Any](List("name", "age"))这里是row的一个方法getValuesMap,获取指定几列的值
官网解释如下:
关于DataFrame row的更多操作方法,可参考
http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.Row
runProgrammaticSchemaExample函数
[mw_shl_code=scala,true]private def runProgrammaticSchemaExample(spark: SparkSession): Unit = {
import spark.implicits._
// $example on:programmatic_schema$
// Create an RDD
val peopleRDD = spark.sparkContext.textFile("examples/src/main/resources/people.txt")
// The schema is encoded in a string
val schemaString = "name age"
// Generate the schema based on the string of schema
val fields = schemaString.split(" ")
.map(fieldName => StructField(fieldName, StringType, nullable = true))
val schema = StructType(fields)
// Convert records of the RDD (people) to Rows
val rowRDD = peopleRDD
.map(_.split(","))
.map(attributes => Row(attributes(0), attributes(1).trim))
// Apply the schema to the RDD
val peopleDF = spark.createDataFrame(rowRDD, schema)
// Creates a temporary view using the DataFrame
peopleDF.createOrReplaceTempView("people")
// SQL can be run over a temporary view created using DataFrames
val results = spark.sql("SELECT name FROM people")
// The results of SQL queries are DataFrames and support all the normal RDD operations
// The columns of a row in the result can be accessed by field index or by field name
results.map(attributes => "Name: " + attributes(0)).show()
// +-------------+
// | value|
// +-------------+
// |Name: Michael|
// | Name: Andy|
// | Name: Justin|
// +-------------+
// $example off:programmatic_schema$
}[/mw_shl_code]
这个函数主要实现了,将RDD转换DataFrame的过程。
[mw_shl_code=scala,true]// The schema is encoded in a string
val schemaString = "name age"
// Generate the schema based on the string of schema
val fields = schemaString.split(" ")
.map(fieldName => StructField(fieldName, StringType, nullable = true))
val schema = StructType(fields)
// Convert records of the RDD (people) to Rows
val rowRDD = peopleRDD
.map(_.split(","))
.map(attributes => Row(attributes(0), attributes(1).trim))[/mw_shl_code]
上面分别是创建一个字符串schemaString ,然后对schemaString处理,通过StructField和StructType转换为schema ,rowRDD 则是由peopleRDD转换而来。[mw_shl_code=scala,true] val peopleDF = spark.createDataFrame(rowRDD, schema)
[/mw_shl_code]
最后这里生成了DataFrame。其它基本重复了,不在啰嗦。
相关文章:
spark2 sql读取json文件的格式要求
http://www.aboutyun.com/forum.php?mod=viewthread&tid=23478
spark2 sql读取json文件的格式要求续:如何查询数据
http://www.aboutyun.com/forum.php?mod=viewthread&tid=23483
spark2 sql编程之实现合并Parquet格式的DataFrame的schema
http://www.aboutyun.com/forum.php?mod=viewthread&tid=23518
spark2 sql编程样例:sql操作
http://www.aboutyun.com/forum.php?mod=viewthread&tid=23501
spark2 sql读取数据源编程学习样例1:程序入口、功能等知识详解
http://www.aboutyun.com/forum.php?mod=viewthread&tid=23484
spark2 sql读取数据源编程学习样例2:函数实现详解
http://www.aboutyun.com/forum.php?mod=viewthread&tid=23489
spark2 sql编程之实现合并Parquet格式的DataFrame的schema
http://www.aboutyun.com/forum.php?mod=viewthread&tid=23518
|
|