Spark(1.6.1) Sql 编程指南+实战案例分析
问题导读:1、Spark SQL操作流程有哪些?
2、如何加载/保存数据源?
3、保存模式有哪些?
static/image/hrline/3.gif
首先看看从官网学习后总结的一个思维导图
概述(Overview)
Spark SQL是Spark的一个模块,用于结构化数据处理。它提供了一个编程的抽象被称为DataFrames,也可以作为分布式SQL查询引擎。
开始Spark SQL
Spark SQL中所有功能的入口点是SQLContext类,或者它子类中的一个。为了创建一个基本的SQLContext,你所需要的是一个SparkContext。
除了基本的SQLContext,你还可以创建一个HiveContext,它提供了基本的SQLContext的所提供的功能的超集。这些功能中包括附加的特性,可以编写查询,使用更完全的HiveQL解析器,访问Hive UDFs,能够从Hive表中读取数据。现在暂不研究,以后学习
<span style="white-space:pre"> </span>SparkConf conf = new SparkConf();
conf.set("spark.testing.memory", "2147480000"); // 因为jvm无法获得足够的资源
JavaSparkContext sc = new JavaSparkContext("local", "First Spark App", conf);
System.out.println(sc);
// sc is an existing JavaSparkContext.
SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);
DataFrames
DataFrame是一种以命名列方式组织的分布式数据集。它概念上相当于关系型数据库中的表,或者R/Python中的数据帧,但是具有更丰富的优化。有很多方式可以构造出一个DataFrame,例如:结构化数据文件,Hive中的tables,外部数据库或者存在的RDDs.
DataFrame的API适用于Scala、Java和Python.
该页上所有的例子使用Spark分布式中的样本数据,可以运行在spark-shell或者pyspark shell中。
创建DataFrames(Creating DataFrames)
使用SQLContext,应用可以从一个已经存在的RDD、Hive表或者数据源中创建DataFrames。
例如,以下根据一个JSON文件创建出一个DataFrame:
package com.tg.spark.sql;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SQLContext;
/**
* 根据一个JSON文件创建出一个DataFrame:
* @author 汤高
*
*/
public class DataFrameOps {
public static void main(String[] args) {
SparkConf conf=new SparkConf();
conf.set("spark.testing.memory", "2147480000"); //因为jvm无法获得足够的资源
//JavaSparkContext sc = new JavaSparkContext("spark://192.168.52.140:7077", "First Spark App",conf);
JavaSparkContext sc = new JavaSparkContext("local", "First Spark App",conf);
SQLContext sqlContext = new SQLContext(sc);
DataFrame df = sqlContext.read().json("hdfs://master:9000/testFile/people.json");
// Displays the content of the DataFrame to stdout
df.show();
// agename
// null Michael
// 30 Andy
// 19 Justin
// Print the schema in a tree format
df.printSchema();
// root
// |-- age: long (nullable = true)
// |-- name: string (nullable = true)
// Select only the "name" column
df.select("name").show();
// name
// Michael
// Andy
// Justin
// Select everybody, but increment the age by 1
df.select(df.col("name"), df.col("age").plus(1)).show();
// name (age + 1)
// Michael null
// Andy 31
// Justin20
// Select people older than 21
df.filter(df.col("age").gt(21)).show();
// age name
// 30Andy
// Count people by age
df.groupBy("age").count().show();
// agecount
// null 1
// 19 1
// 30 1
}
}
SQLContext中的sql函数使应用可以以编程方式运行SQL查询,并且将结果以DataFrame形式返回。具体案例见后面
Spark SQL支持两种不同的方法,用于将存在的RDDs转换成DataFrames。第一种方法使用反射来推断包含特定类型的对象的RDD的模式。在写Spark应用时,当你已知schema的情况下,这种基于反射的方式使得代码更加简介,并且效果更好。
创建DataFrames的第二种方法是通过编程接口,它允许你构建一个模式,然后将其应用到现有的RDD上。这种方式更加的繁琐,它允许你构建一个DataFrame当列以及类型未知,直到运行时才能知道时。
使用反射推断模式(Inferring the Schema Using Reflection)
知道RDD格式的前提下
JavaBeans类定义了表的模式,JavaBeans类的参数的名称使用反射来读取,然后称为列的名称。
JavaBeans类还可以嵌套或者包含复杂的类型,例如Sequences或者Arrays。
这个RDD可以隐式地转换为DataFrame,然后注册成表,
表可以在后续SQL语句中使用Spark SQL中的Scala接口支持自动地将包含JavaBeans类的RDD转换成DataFrame。
步骤:
1、使用JavaBeans类定义schema
2、创建一个SQLContext
3、通过调用createDataFrame方法模式应用到所有现有的RDD,并为JavaBean提供class对象达到将RDD转换成DataFrame
4、创建一个DataFrame,并将它注册成表。
5、使用sqlContext提供的sql方法,就可以使用SQL语句来查询了。查询后返回的结果是DataFrame,它支持所有的RDD操作
首先写一个JavaBean类,实现序列化接口,并提供get和set方法
package com.tg.spark.sql;
import scala.Serializable;
public class Person implements Serializable {
/**
*
*/
private static final long serialVersionUID = 727694963564948838L;
private String name;
private int age;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
}
package com.tg.spark.sql;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.storage.StorageLevel;
import java.util.List;
import org.apache.spark.SparkConf;
public class CreateDataFrame1 {
public static void main(String[] args) {
SparkConf conf=new SparkConf();
conf.set("spark.testing.memory", "2147480000"); //因为jvm无法获得足够的资源
JavaSparkContext sc = new JavaSparkContext("local", "First Spark App",conf);
System.out.println(sc);
// sc is an existing JavaSparkContext.
SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);
// Load a text file and convert each line to a JavaBean.
JavaRDD<Person> people = sc.textFile("hdfs://master:9000/testFile/people.txt").map(
new Function<String, Person>() {
public Person call(String line) throws Exception {
String[] parts = line.split(",");
Person person = new Person();
person.setName(parts);
person.setAge(Integer.parseInt(parts.trim()));
return person;
}
});
//A schema can be applied to an existing RDD by calling createDataFrame and providing the Class object for the JavaBean.
// Apply a schema to an RDD of JavaBeans and register it as a table.
DataFrame schemaPeople = sqlContext.createDataFrame(people, Person.class);
schemaPeople.registerTempTable("people");
// SQL can be run over RDDs that have been registered as tables.
DataFrame teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19");
// The results of SQL queries are DataFrames and support all the normal RDD operations.
// The columns of a row in the result can be accessed by ordinal.
List<String> teenagerNames = teenagers.javaRDD().map(new Function<Row, String>() {
public String call(Row row) {
return "Name: " + row.getString(0);
}
}).collect();
teenagers.persist(StorageLevel.MEMORY_ONLY());
System.out.println(teenagerNames);
}
}
上面的这段代码
DataFrame teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19");
SQLContext中的sql函数使应用可以以编程方式运行SQL查询,并且将结果以DataFrame形式返回
以编程方式指定模式(Programmatically Specifying the Schema)
不知道RDD的列和它的类型时
步骤:
1.从原有的RDD中创建包含行的RDD。
2.创建一个由StructType表示的模式,StructType符合由步骤1创建的RDD的行的结构。
3.通过SQLContext提供的createDataFrame方法,将模式应用于包含行的RDD。
package com.tg.spark.sql;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.storage.StorageLevel;
import java.util.ArrayList;
import java.util.List;
import org.apache.spark.SparkConf;
public class CreateDataFrame2 {
public static void main(String[] args) {
SparkConf conf = new SparkConf();
conf.set("spark.testing.memory", "2147480000"); // 因为jvm无法获得足够的资源
JavaSparkContext sc = new JavaSparkContext("local", "First Spark App", conf);
System.out.println(sc);
// sc is an existing JavaSparkContext.
SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);
// Load a text file and convert each line to a JavaBean.
JavaRDD<String> people = sc.textFile("hdfs://master:9000/testFile/people.txt");
// Convert records of the RDD (people) to Rows.
JavaRDD<Row> rowRDD = people.map(new Function<String, Row>() {
public Row call(String record) throws Exception {
String[] fields = record.split(",");
return RowFactory.create(fields, fields.trim());
}
});
// The schema is encoded in a string
String schemaString = "name age";
// Generate the schema based on the string of schema
List<StructField> fields = new ArrayList<StructField>();
for (String fieldName : schemaString.split(" ")) {
// true表示可以为空
fields.add(DataTypes.createStructField(fieldName, DataTypes.StringType, true));
}
StructType schema = DataTypes.createStructType(fields);
// Apply the schema to the RDD.
DataFrame peopleDataFrame = sqlContext.createDataFrame(rowRDD, schema);
// Register the DataFrame as a table.
peopleDataFrame.registerTempTable("people");
// SQL can be run over RDDs that have been registered as tables.
DataFrame results = sqlContext.sql("SELECT name FROM people");
// The results of SQL queries are DataFrames and support all the normal
// RDD operations.
// The columns of a row in the result can be accessed by ordinal.
List<String> names = results.javaRDD().map(new Function<Row, String>() {
public String call(Row row) {
return "Name: " + row.getString(0);
}
}).collect();
results.persist(StorageLevel.MEMORY_ONLY());
System.out.println(names);
}
}
数据源(Data Sources)
Spark SQL支持通过DataFrame接口在多种数据源上进行操作。一个DataFrame可以如同一个标准的RDDs那样进行操作,还可以注册成临时的表。将一个DataFrame注册成临时表允许你在它的数据上运行SQL查询。本节介绍使用Spark数据源装载和保存数据的常用方法,使用Spark数据源保存数据。然后进入可用于内置数据源的特定选项。
通用的加载/保存功能(Generic Load/Save Functions)
在最简单的形式中,默认的数据源(parquet除非通过spark.sql.sources.default另外进行配置)将被用于所有的操作。
package com.tg.spark.sql;
import java.util.List;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.storage.StorageLevel;
/**
* 加载默认的数据源格式并保存
* //第一种读取方式xxxFile(path)
* @author Administrator
*
*/
public class DataSource {
public static void main(String[] args) {
SparkConf conf = new SparkConf();
conf.set("spark.testing.memory", "2147480000"); // 因为jvm无法获得足够的资源
JavaSparkContext sc = new JavaSparkContext("local", "First Spark App", conf);
System.out.println(sc);
// sc is an existing JavaSparkContext.
SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);
DataFrame df = sqlContext.read().load("hdfs://master:9000/testFile/users.parquet");
df.select("name", "favorite_color").write().save("namesAndFavColors.parquet");
//指定保存模式
//df.select("name", "favorite_color").write().mode(SaveMode.Overwrite).save("namesAndFavColors.parquet");
//第一种读取方式
DataFrame parquetFile = sqlContext.parquetFile("namesAndFavColors.parquet");
parquetFile.registerTempTable("people");
// SQL can be run over RDDs that have been registered as tables.
DataFrame teenagers = sqlContext.sql("SELECT name FROM people ");
// The results of SQL queries are DataFrames and support all the normal RDD operations.
// The columns of a row in the result can be accessed by ordinal.
List<String> teenagerNames = teenagers.javaRDD().map(new Function<Row, String>() {
public String call(Row row) {
return "Name: " + row.getString(0);
}
}).collect();
teenagers.persist(StorageLevel.MEMORY_ONLY());
System.out.println(teenagerNames);
}
}
手动指定选项(Manually Specifying Options)
你还可以手动指定数据源,这些数据源将与任何额外的选项一同使用,你希望将这些选项传入到数据源中。数据源是通过它们的全名来指定的(如org.apache.spark.sql.parquet),但是对于内置的数据源,你也可以使用简短的名称(json, parquet, jdbc)。任何类型的DataFrames使用这些语法可以转化成其他的数据源:
package com.tg.spark.sql;
import java.util.List;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.storage.StorageLevel;
/**
* 加载指定的数据源格式并保存
* //第二种读取方式sqlContext.read().XXX(path)
* @author Administrator
*
*/
public class DataSource2 {
public static void main(String[] args) {
SparkConf conf = new SparkConf();
conf.set("spark.testing.memory", "2147480000"); // 因为jvm无法获得足够的资源
JavaSparkContext sc = new JavaSparkContext("local", "First Spark App", conf);
System.out.println(sc);
// sc is an existing JavaSparkContext.
SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);
DataFrame df = sqlContext.read().format("json").load("hdfs://master:9000/testFile/people.json");
df.select("name", "age").write().format("parquet").save("people.parquet");
DataFrame parquetFile = sqlContext.read().parquet("people.parquet");
// Parquet files can also be registered as tables and then used in SQL statements.
parquetFile.registerTempTable("parquetFile");
DataFrame teenagers = sqlContext.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19");
List<String> teenagerNames = teenagers.javaRDD().map(new Function<Row, String>() {
public String call(Row row) {
return "Name: " + row.getString(0);
}
}).collect();
teenagers.persist(StorageLevel.MEMORY_ONLY());
System.out.println(teenagerNames);
}
}
保存模式(Save Modes)
Save操作可以可选择性地接收一个SaveModel,如果数据已经存在了,指定如何处理已经存在的数据。意识到这些保存模式没有利用任何锁,也不是原子的,这很重要。因此,如果有多个写入者试图往同一个地方写入,这是不安全的。此外,当执行一个Overwrite,在写入新的数据之前会将原来的数据进行删除。
df.select("name", "favorite_color").write().mode(SaveMode.Overwrite).save("namesAndFavColors.parquet");
Scala/JavaPythonMeaning
SaveMode.ErrorIfExists (default)"error" (default)When saving a DataFrame to a data source, if data already exists, an exception is expected to be thrown. 当往一个数据源中保存一个DataFrame,如果数据已经存在,会抛出一个异常。
SaveMode.Append"append"When saving a DataFrame to a data source, if data/table already exists, contents of the DataFrame are expected to be appended to existing data. 当往一个数据源中保存一个DataFrame,如果data/table已经存在,DataFrame的内容会追加到已经存在的数据后面。
SaveMode.Overwrite"overwrite"Overwrite mode means that when saving a DataFrame to a data source, if data/table already exists, existing data is expected to be overwritten by the contents of the DataFrame. Overwrite模式意味着当向数据源中保存一个DataFrame时,如果data/table已经存在了,已经存在的数据会被DataFrame中内容覆盖掉。
SaveMode.Ignore"ignore"Ignore mode means that when saving a DataFrame to a data source, if data already exists, the save operation is expected to not save the contents of the DataFrame and to not change the existing data. This is similar to a `CREATE TABLE IF NOT EXISTS` in SQL. Ignore模式意味着当向数据源中保存一个DataFrame时,如果数据已经存在,save操作不会将DataFrame的内容进行保存,也不会修改已经存在的数据。这与SQL中的`CREATE TABLE IF NOT EXISTS`相似。
Parquet 文件
Parquet是一种列式存储格式的文件,被许多其他数据处理系统所支持。Spark SQL支持度对Parquet文件的读和写,自动保存原有数据的模式。
代码上面用过一次
package com.tg.spark.sql;
import java.util.List;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.storage.StorageLevel;
/**
* 加载默认的数据源格式并保存
* //第一种读取方式xxxFile(path)
* @author Administrator
*
*/
public class DataSource {
public static void main(String[] args) {
SparkConf conf = new SparkConf();
conf.set("spark.testing.memory", "2147480000"); // 因为jvm无法获得足够的资源
JavaSparkContext sc = new JavaSparkContext("local", "First Spark App", conf);
System.out.println(sc);
// sc is an existing JavaSparkContext.
SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);
DataFrame df = sqlContext.read().load("hdfs://master:9000/testFile/users.parquet");
df.select("name", "favorite_color").write().save("namesAndFavColors.parquet");
//指定保存模式
//df.select("name", "favorite_color").write().mode(SaveMode.Overwrite).save("namesAndFavColors.parquet");
//第一种读取方式
DataFrame parquetFile = sqlContext.parquetFile("namesAndFavColors.parquet");
parquetFile.registerTempTable("people");
// SQL can be run over RDDs that have been registered as tables.
DataFrame teenagers = sqlContext.sql("SELECT name FROM people ");
// The results of SQL queries are DataFrames and support all the normal RDD operations.
// The columns of a row in the result can be accessed by ordinal.
List<String> teenagerNames = teenagers.javaRDD().map(new Function<Row, String>() {
public String call(Row row) {
return "Name: " + row.getString(0);
}
}).collect();
teenagers.persist(StorageLevel.MEMORY_ONLY());
System.out.println(teenagerNames);
}
}
JSON数据集(JSON Datasets)
Spark SQL可以自动推断出JSON数据集的模式,将它作为DataFrame进行加载。这个转换可以通过使用SQLContext中的下面两个方法中的任意一个来完成。
• jsonFile - 从一个JSON文件的目录中加载数据,文件中的每一个行都是一个JSON对象。
• jsonRDD - 从一个已经存在的RDD中加载数据,每一个RDD的元素是一个包含一个JSON对象的字符串。
代码前面都有涉及到
public class DataSource3 {
public static void main(String[] args) {
SparkConf conf = new SparkConf();
conf.set("spark.testing.memory", "2147480000"); // 因为jvm无法获得足够的资源
JavaSparkContext sc = new JavaSparkContext("local", "First Spark App", conf);
System.out.println(sc);
// sc is an existing JavaSparkContext.
SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);
// A JSON dataset is pointed to by path.
// The path can be either a single text file or a directory storing text files.
DataFrame people = sqlContext.read().json("hdfs://master:9000/testFile/people.json");
//DataFrame people = sqlContext.jsonFile("hdfs://master:9000/testFile/people.json");
// The inferred schema can be visualized using the printSchema() method.
people.printSchema();
// root
//|-- age: integer (nullable = true)
//|-- name: string (nullable = true)
// Register this DataFrame as a table.
people.registerTempTable("people");
// SQL statements can be run by using the sql methods provided by sqlContext.
DataFrame teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19");
// Alternatively, a DataFrame can be created for a JSON dataset represented by
// an RDD storing one JSON object per string.
List<String> jsonData = Arrays.asList(
"{\"name\":\"Yin\",\"address\":{\"city\":\"Columbus\",\"state\":\"Ohio\"}}");
JavaRDD<String> anotherPeopleRDD = sc.parallelize(jsonData);
DataFrame anotherPeople = sqlContext.read().json(anotherPeopleRDD);
anotherPeople.show();
}
}
Datasets
Datasets是新出的接口在1.6版本,为了使RDDS更便利(强类型,能使用强大的lambda函数),可以通过JVM对象构建或者通过熟练使用函数化转换得到(map, flatMap, filter, etc)
The unified Dataset API can be used both in Scala and Java. Python does not yet have support for the Dataset API, but due to its dynamic nature many of the benefits are already available (i.e. you can access the field of a row by name naturally row.columnName). Full python support will be added in a future release.
至于怎么用spark操作hive和其他数据库,以后再做学习
来源:http://blog.csdn.net/tanggao1314/article/details/51594942
很详情,楼主分析 谢谢楼主,正需要,赞~~ 很详情,谢谢楼主
页:
[1]