本帖最后由 a87758133 于 2019-3-26 12:27 编辑
问题导读
1.什么是SQL?
2.大数据计算领域对SQL的应用有哪些?
3.SQL有什么特性?
4.Flink SQL Job的组成有哪些?
5.Flink SQL 核心算子有哪些?
SQL简述SQL是Structured Query Language的缩写,最初是由美国计算机科学家 Donald D. Chamberlin和Raymond F. Boyce在20世纪70年代早期从 Early History of SQL 中了解关系模型后在IBM开发的。该版本最初称为[SEQUEL: A Structured English Query Language](结构化英语查询语言),旨在操纵和检索存储在IBM原始准关系数据库管理系统System R中的数据。SEQUEL后来改为SQL,因为“SEQUEL”是英国Hawker Siddeley飞机公司的商标。我们看看这款用于特技飞行的英国皇家空军豪客Siddeley Hawk T.1A (Looks great):
第一款SQL数据库在20世纪70年代后期,Oracle公司(当时叫 Relational Software,Inc.)开发了基于SQL的RDBMS,并希望将其出售给美国海军,Central Intelligence代理商和其他美国政府机构。 1979年6月,Oracle 公司为VAX计算机推出了第一个商业化的SQL实现,即Oracle V2。
ANSI-SQL标准的采用直到1986年,ANSI和ISO标准组正式采用了标准的"数据库语言SQL"语言定义。该标准的新版本发布于1989,1992,1996,1999,2003,2006,2008,2011,以及最近的2016。Apache Flink SQL 核心算子的语义设计也参考了 1992 、 2011等ANSI-SQL标准。
SQL操作及扩展SQL是专为查询包含在关系数据库中的数据而设计的,是一种基于SET操作的声明性编程语言,而不是像C语言一样的命令式编程语言。但是,各大关系数据库厂商在遵循ANSI-SQL标准的同时又对标准SQL进行扩展,由基于SET(无重复元素)的操作扩展到基于BAG(有重复元素)的操作,并且添加了过程编程语言功能,如:Oracle的PL/SQL, DB2的SQL PL,MySQL - SQL/PSM以及SQL Server的T-SQL等等。
随着时间的推移ANSI-SQL规范不断完善,所涉及的功能不断丰富,比如在ANSI-2011中又增加了Temporal Table的标准定义,Temporal Table的标准在结构化关系数据存储上添加了时间维度信息,这使得关系数据库中不仅可以对当前数据进行查询操作,根据时间版本信息也可以对历史数据进行操作。这些不断丰富的功能极大增强了SQL的应用领域。
大数据计算领域对SQL的应用离线计算(批计算)提及大数据计算领域不得不说MapReduce计算模型,MapReduce最早是由Google公司研究提出的一种面向大规模数据处理的并行计算模型和方法,并发于2004年发表了论文 Simplified Data Processing on Large Clusters。
论文发表之后Apache 开源社区参考Google MapReduce,基于Java设计开发了一个称为Hadoop的开源MapReduce并行计算框架。很快得到了全球学术界和工业界的普遍关注,并得到推广和普及应用。
但利用Hadoop进行MapReduce的开发,需要开发人员精通Java语言,并了解MapReduce的运行原理,这样在一定程度上提高了MapReduce的开发门槛,所以在开源社区又不断涌现了一些为了简化MapReduce开发的开源框架,其中Hive就是典型的代表。HSQL可以让用户以类SQL的方式描述MapReduce计算,比如原本需要几十行,甚至上百行才能完成的wordCount,用户一条SQL语句就能完成了,这样极大的降低了MapReduce的开发门槛,进而也成功的将SQL应用到了大数据计算领域当中来。
实时计算(流计算)SQL不仅仅被成功的应用到了离线计算,SQL的易用性也吸引了流计算产品,目前最热的Spark,Flink也纷纷支持了SQL,尤其是Flink支持的更加彻底,集成了Calcite,完全遵循ANSI-SQL标准。Apache Flink在low-level API上面用DataSet支持批计算,用DataStream支持流计算,但在High-Level API上面利用SQL将流与批进行了统一,使得用户编写一次SQL既可以在流计算中使用,又可以在批计算中使用,为既有流计算业务,又有批计算业务的用户节省了大量开发成本。
SQL高性能与简洁性性能SQL经过传统数据库领域几十年的不断打磨,查询优化器已经能够极大的优化SQL的查询性能,Apache Flink 应用Calcite进行查询优化,复用了大量数据库查询优化规则,在性能上不断追求极致,能够让用户关心但不用担心性能问题。如下图(Alibaba 对 Apache Flink 进行架构优化后的组件栈) 相对于DataStream而言,SQL会经过Optimization模块透明的为用户进行查询优化,用户专心编写自己的业务逻辑,不用担心性能,却能得到最优的查询性能!
简洁就简洁性而言,SQL与DataSet和DataStream相比具有很大的优越性,我们先用一个WordCount示例来直观的查看用户的代码量:
[mw_shl_code=java,true]... //省略初始化代码
// 核心逻辑
text.flatMap(new WordCount.Tokenizer()).keyBy(new int[]{0}).sum(1);
// flatmap 代码定义
public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
public Tokenizer() {
}
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
String[] tokens = value.toLowerCase().split("\\W+");
String[] var4 = tokens;
int var5 = tokens.length;
for(int var6 = 0; var6 < var5; ++var6) {
String token = var4[var6];
if (token.length() > 0) {
out.collect(new Tuple2(token, 1));
}
}
}
}[/mw_shl_code] [mw_shl_code=sql,true]...//省略初始化代码
SELECT word, COUNT(word) FROM tab GROUP BY word;[/mw_shl_code] 我们直观的体会到相同的统计功能使用SQL的简洁性。
Flink SQL Job的组成我们做任何数据计算都离不开读取原始数据,计算逻辑和写入计算结果数据三部分,当然基于Apache Flink SQL编写的计算Job也离不开这三个部分,如下所示: 如上所示,一个完整的Apache Flink SQL Job 由如下三部分:
- Source Operator - Soruce operator是对外部数据源的抽象, 目前Apache Flink内置了很多常用的数据源实现,比如上图提到的Kafka。
- Query Operators - 查询算子主要完成如图的Query Logic,目前支持了Union,Join,Projection,Difference, Intersection以及window等大多数传统数据库支持的操作。
- Sink Operator - Sink operator 是对外结果表的抽象,目前Apache Flink也内置了很多常用的结果表的抽象,比如上图提到的Kafka。
Flink SQL 核心算子
目前Flink SQL支持Union,Join,Projection,Difference, Intersection以及Window等大多数传统数据库支持的操作,接下来为大家分别进行简单直观的介绍。 环境为了很好的体验和理解Apache Flink SQL算子我们需要先准备一下测试环境,我们选择 IDEA,以ITCase测试方式来进行体验。IDEA 安装这里不占篇幅介绍了,相信大家能轻松搞定!我们进行功能体验有两种方式,具体如下: 源码方式对于开源爱好者可能更喜欢源代码方式理解和体验Apache Flink SQL功能,那么我们需要下载源代码并导入到IDEA中: [mw_shl_code=shell,true]// 下载源代码
git clone https://github.com/apache/flink.git study
// 进入源码目录
cd study
// 拉取稳定版release-1.6
git fetch origin release-1.6:release-1.6
//切换到稳定版
git checkout release-1.6
//将依赖安装到本地mvn仓库,耐心等待需要一段时间
mvn clean install -DskipTests[/mw_shl_code] 将Flink源码导入到IDEA过程这里不再占用篇幅,导入后确保在IDEA中可以运行 org.apache.flink.table.runtime.stream.sql.SqlITCase 并测试全部通过,即证明体验环境已经完成。如下图所示: 如上图运行测试后显示测试通过,我们就可以继续下面的Apache Flink SQL功能体验了。 依赖Flink包方式我们还有一种更简单直接的方式,就是新建一个mvn项目,并在pom中添加如下依赖: [mw_shl_code=xml,true]<properties>
<table.version>1.6-SNAPSHOT</table.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table_2.11</artifactId>
<version>${table.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.11</artifactId>
<version>${table.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>${table.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>${table.version}</version>
</dependency>
<dependency>
<groupId>JUnit</groupId>
<artifactId>JUnit</artifactId>
<version>4.12</version>
</dependency>
</dependencies>[/mw_shl_code] 完成环境准备后,我们开始准备测试数据和写一个简单的测试类。 示例数据及测试类测试数据customer_tab 表 - 客户表保存客户id,客户姓名和客户描述信息。字段及测试数据如下:
c_idc_namec_descc_001Kevinfrom JinLinc_002Sunnyfrom JinLinc_003JinChengfrom HeBei
- order_tab 表 - 订单表保存客户购买的订单信息,包括订单id,订单时间和订单描述信息。 字段节测试数据如下:
o_idc_ido_timeo_desco_oo1c_0022018-11-05 10:01:01iphoneo_002c_0012018-11-05 10:01:55ipado_003c_0012018-11-05 10:03:44flink book
- Item_tab 商品表, 携带商品id,商品类型,出售时间,价格等信息,具体如下:
itemIDitemTypeonSellTimepriceITEM001Electronic2017-11-11 10:01:0020ITEM002Electronic2017-11-11 10:02:0050ITEM003Electronic2017-11-11 10:03:0030ITEM004Electronic2017-11-11 10:03:0060ITEM005Electronic2017-11-11 10:05:0040ITEM006Electronic2017-11-11 10:06:0020ITEM007Electronic2017-11-11 10:07:0070ITEM008Clothes2017-11-11 10:08:0020
- PageAccess_tab 页面访问表,包含用户ID,访问时间,用户所在地域信息,具体数据如下:
regionuserIdaccessTimeShangHaiU00102017-11-11 10:01:00BeiJingU10012017-11-11 10:01:00BeiJingU20322017-11-11 10:10:00BeiJingU11002017-11-11 10:11:00ShangHaiU00112017-11-11 12:10:00
- PageAccessCount_tab 页面访问表,访问量,访问时间,用户所在地域信息,具体数据如下:
regionuserCountaccessTimeShangHai1002017.11.11 10:01:00BeiJing862017.11.11 10:01:00BeiJing2102017.11.11 10:06:00BeiJing332017.11.11 10:10:00ShangHai1292017.11.11 12:10:00
- PageAccessSession_tab 页面访问表,访问量,访问时间,用户所在地域信息,具体数据如下:
regionuserIdaccessTimeShangHaiU00112017-11-11 10:01:00ShangHaiU00122017-11-11 10:02:00ShangHaiU00132017-11-11 10:03:00ShangHaiU00152017-11-11 10:05:00ShangHaiU00112017-11-11 10:10:00BeiJingU01102017-11-11 10:10:00ShangHaiU20102017-11-11 10:11:00ShangHaiU04102017-11-11 12:16:00
测试类我们创建一个SqlOverviewITCase.scala 用于接下来介绍Flink SQL算子的功能体验。代码如下: [mw_shl_code=scala,true]import org.apache.flink.api.scala._
import org.apache.flink.runtime.state.StateBackend
import org.apache.flink.runtime.state.memory.MemoryStateBackend
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.scala._
import org.apache.flink.types.Row
import org.junit.rules.TemporaryFolder
import org.junit.{Rule, Test}
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
class SqlOverviewITCase {
val _tempFolder = new TemporaryFolder
@Rule
def tempFolder: TemporaryFolder = _tempFolder
def getStateBackend: StateBackend = {
new MemoryStateBackend()
}
// 客户表数据
val customer_data = new mutable.MutableList[(String, String, String)]
customer_data.+=(("c_001", "Kevin", "from JinLin"))
customer_data.+=(("c_002", "Sunny", "from JinLin"))
customer_data.+=(("c_003", "JinCheng", "from HeBei"))
// 订单表数据
val order_data = new mutable.MutableList[(String, String, String, String)]
order_data.+=(("o_001", "c_002", "2018-11-05 10:01:01", "iphone"))
order_data.+=(("o_002", "c_001", "2018-11-05 10:01:55", "ipad"))
order_data.+=(("o_003", "c_001", "2018-11-05 10:03:44", "flink book"))
// 商品销售表数据
val item_data = Seq(
Left((1510365660000L, (1510365660000L, 20, "ITEM001", "Electronic"))),
Right((1510365660000L)),
Left((1510365720000L, (1510365720000L, 50, "ITEM002", "Electronic"))),
Right((1510365720000L)),
Left((1510365780000L, (1510365780000L, 30, "ITEM003", "Electronic"))),
Left((1510365780000L, (1510365780000L, 60, "ITEM004", "Electronic"))),
Right((1510365780000L)),
Left((1510365900000L, (1510365900000L, 40, "ITEM005", "Electronic"))),
Right((1510365900000L)),
Left((1510365960000L, (1510365960000L, 20, "ITEM006", "Electronic"))),
Right((1510365960000L)),
Left((1510366020000L, (1510366020000L, 70, "ITEM007", "Electronic"))),
Right((1510366020000L)),
Left((1510366080000L, (1510366080000L, 20, "ITEM008", "Clothes"))),
Right((151036608000L)))
// 页面访问表数据
val pageAccess_data = Seq(
Left((1510365660000L, (1510365660000L, "ShangHai", "U0010"))),
Right((1510365660000L)),
Left((1510365660000L, (1510365660000L, "BeiJing", "U1001"))),
Right((1510365660000L)),
Left((1510366200000L, (1510366200000L, "BeiJing", "U2032"))),
Right((1510366200000L)),
Left((1510366260000L, (1510366260000L, "BeiJing", "U1100"))),
Right((1510366260000L)),
Left((1510373400000L, (1510373400000L, "ShangHai", "U0011"))),
Right((1510373400000L)))
// 页面访问量表数据2
val pageAccessCount_data = Seq(
Left((1510365660000L, (1510365660000L, "ShangHai", 100))),
Right((1510365660000L)),
Left((1510365660000L, (1510365660000L, "BeiJing", 86))),
Right((1510365660000L)),
Left((1510365960000L, (1510365960000L, "BeiJing", 210))),
Right((1510366200000L)),
Left((1510366200000L, (1510366200000L, "BeiJing", 33))),
Right((1510366200000L)),
Left((1510373400000L, (1510373400000L, "ShangHai", 129))),
Right((1510373400000L)))
// 页面访问表数据3
val pageAccessSession_data = Seq(
Left((1510365660000L, (1510365660000L, "ShangHai", "U0011"))),
Right((1510365660000L)),
Left((1510365720000L, (1510365720000L, "ShangHai", "U0012"))),
Right((1510365720000L)),
Left((1510365720000L, (1510365720000L, "ShangHai", "U0013"))),
Right((1510365720000L)),
Left((1510365900000L, (1510365900000L, "ShangHai", "U0015"))),
Right((1510365900000L)),
Left((1510366200000L, (1510366200000L, "ShangHai", "U0011"))),
Right((1510366200000L)),
Left((1510366200000L, (1510366200000L, "BeiJing", "U2010"))),
Right((1510366200000L)),
Left((1510366260000L, (1510366260000L, "ShangHai", "U0011"))),
Right((1510366260000L)),
Left((1510373760000L, (1510373760000L, "ShangHai", "U0410"))),
Right((1510373760000L)))
def procTimePrint(sql: String): Unit = {
// Streaming 环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env)
// 将order_tab, customer_tab 注册到catalog
val customer = env.fromCollection(customer_data).toTable(tEnv).as('c_id, 'c_name, 'c_desc)
val order = env.fromCollection(order_data).toTable(tEnv).as('o_id, 'c_id, 'o_time, 'o_desc)
tEnv.registerTable("order_tab", order)
tEnv.registerTable("customer_tab", customer)
val result = tEnv.sqlQuery(sql).toRetractStream[Row]
val sink = new RetractingSink
result.addSink(sink)
env.execute()
}
def rowTimePrint(sql: String): Unit = {
// Streaming 环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setStateBackend(getStateBackend)
env.setParallelism(1)
val tEnv = TableEnvironment.getTableEnvironment(env)
// 将item_tab, pageAccess_tab 注册到catalog
val item =
env.addSource(new EventTimeSourceFunction[(Long, Int, String, String)](item_data))
.toTable(tEnv, 'onSellTime, 'price, 'itemID, 'itemType, 'rowtime.rowtime)
val pageAccess =
env.addSource(new EventTimeSourceFunction[(Long, String, String)](pageAccess_data))
.toTable(tEnv, 'accessTime, 'region, 'userId, 'rowtime.rowtime)
val pageAccessCount =
env.addSource(new EventTimeSourceFunction[(Long, String, Int)](pageAccessCount_data))
.toTable(tEnv, 'accessTime, 'region, 'accessCount, 'rowtime.rowtime)
val pageAccessSession =
env.addSource(new EventTimeSourceFunction[(Long, String, String)](pageAccessSession_data))
.toTable(tEnv, 'accessTime, 'region, 'userId, 'rowtime.rowtime)
tEnv.registerTable("item_tab", item)
tEnv.registerTable("pageAccess_tab", pageAccess)
tEnv.registerTable("pageAccessCount_tab", pageAccessCount)
tEnv.registerTable("pageAccessSession_tab", pageAccessSession)
val result = tEnv.sqlQuery(sql).toRetractStream[Row]
val sink = new RetractingSink
result.addSink(sink)
env.execute()
}
@Test
def testSelect(): Unit = {
val sql = "替换想要测试的SQL"
// 非window 相关用 procTimePrint(sql)
// Window 相关用 rowTimePrint(sql)
}
}
// 自定义Sink
final class RetractingSink extends RichSinkFunction[(Boolean, Row)] {
var retractedResults: ArrayBuffer[String] = mutable.ArrayBuffer.empty[String]
def invoke(v: (Boolean, Row)) {
retractedResults.synchronized {
val value = v._2.toString
if (v._1) {
retractedResults += value
} else {
val idx = retractedResults.indexOf(value)
if (idx >= 0) {
retractedResults.remove(idx)
} else {
throw new RuntimeException("Tried to retract a value that wasn't added first. " +
"This is probably an incorrectly implemented test. " +
"Try to set the parallelism of the sink to 1.")
}
}
}
retractedResults.sorted.foreach(println(_))
}
}
// Water mark 生成器
class EventTimeSourceFunction[T](
dataWithTimestampList: Seq[Either[(Long, T), Long]]) extends SourceFunction[T] {
override def run(ctx: SourceContext[T]): Unit = {
dataWithTimestampList.foreach {
case Left(t) => ctx.collectWithTimestamp(t._2, t._1)
case Right(w) => ctx.emitWatermark(new Watermark(w))
}
}
override def cancel(): Unit = ???
}[/mw_shl_code]
(1)SelectSELECT 用于从数据集/流中选择数据,语法遵循ANSI-SQL标准,语义是关系代数中的投影(Projection),对关系进行垂直分割,消去某些列, 如下图所示:
SQL 示例从customer_tab选择用户姓名,并用内置的CONCAT函数拼接客户信息,如下: [mw_shl_code=sql,true]SELECT c_name, CONCAT(c_name, ' come ', c_desc) as desc FROM customer_tab;
[/mw_shl_code] Resultc_namedescKevinKevin come from JinLinSunnySunny come from JinLinJinchengJincheng come from HeBei 特别说明大家看到在 SELECT 不仅可以使用普通的字段选择,还可以使用ScalarFunction,当然也包括User-Defined Function,同时还可以进行字段的alias设置。其实SELECT可以结合聚合,在GROUPBY部分会进行介绍,一个比较特殊的使用场景是携带 DISTINCT 关键字,示例如下: SQL 示例在订单表查询所有的客户id,消除重复客户id, 如下: [mw_shl_code=sql,true]SELECT DISTINCT c_id FROM order_tab;
[/mw_shl_code] Resultc_idc_001c_002
(2)WHEREWHERE 用于从数据集/流中过滤数据,与SELECT一起使用,语法遵循ANSI-SQL标准,语义是关系代数的Selection,根据某些条件对关系做水平分割,即选择符合条件的记录,如下所示:
SQL 示例在customer_tab查询客户id为c_001和c_003的客户信息,如下: [mw_shl_code=sql,true]SELECT c_id, c_name, c_desc FROM customer_tab WHERE c_id = 'c_001' OR c_id = 'c_003';
[/mw_shl_code] Resultc_idc_namec_descc_001Kevinfrom JinLinc_003JinChengfrom HeBei 特别说明我们发现WHERE是对满足一定条件的数据进行过滤,WHERE支持=, <, >, <>, >=, <=以及AND, OR等表达式的组合,最终满足过滤条件的数据会被选择出来。并且 WHERE 可以结合IN,NOT IN联合使用,具体如下:
SQL 示例 (IN 常量)使用 IN 在customer_tab查询客户id为c_001和c_003的客户信息,如下: [mw_shl_code=sql,true]SELECT c_id, c_name, c_desc FROM customer_tab WHERE c_id IN ('c_001', 'c_003');
[/mw_shl_code] Resultc_idc_namec_descc_001Kevinfrom JinLinc_003JinChengfrom HeBei SQL 示例 (IN 子查询)使用 IN和 子查询 在customer_tab查询已经下过订单的客户信息,如下: [mw_shl_code=sql,true]SELECT c_id, c_name, c_desc FROM customer_tab WHERE c_id IN (SELECT c_id FROM order_tab);[/mw_shl_code] Resultc_idc_namec_descc_001Kevinfrom JinLinc_002Sunnyfrom JinLin
(3)IN/NOT IN 与关系代数如上介绍IN是关系代数中的Intersection, NOT IN是关系代数的Difference, 如下图示意: IN(Intersection)
(4)GROUP BYGROUP BY 是对数据进行分组的操作,比如我需要分别计算一下一个学生表里面女生和男生的人数分别是多少,如下:
SQL 示例将order_tab信息按customer_tab分组统计订单数量,简单示例如下: [mw_shl_code=sql,true]SELECT c_id, count(o_id) as o_count FROM order_tab GROUP BY c_id;
[/mw_shl_code] Resultc_ido_countc_0012c_0021 特别说明在实际的业务场景中,GROUP BY除了按业务字段进行分组外,很多时候用户也可以用时间来进行分组(相当于划分窗口),比如统计每分钟的订单数量: SQL 示例按时间进行分组,查询每分钟的订单数量,如下: [mw_shl_code=sql,true]SELECT SUBSTRING(o_time, 1, 16) AS o_time_min, count(o_id) AS o_count FROM order_tab GROUP BY SUBSTRING(o_time, 1, 16)
[/mw_shl_code] Resulto_time_mino_count2018-11-05 10:0122018-11-05 10:031 说明:如果我们时间字段是timestamp类型,建议使用内置的 DATE_FORMAT 函数。
(5)UNION ALLUNION ALL 将两个表合并起来,要求两个表的字段完全一致,包括字段类型、字段顺序,语义对应关系代数的Union,只是关系代数是Set集合操作,会有去重复操作,UNION ALL 不进行去重,如下所示:
SQL 示例我们简单的将customer_tab查询2次,将查询结果合并起来,如下: [mw_shl_code=sql,true]SELECT c_id, c_name, c_desc FROM customer_tab
UNION ALL
SELECT c_id, c_name, c_desc FROM customer_tab[/mw_shl_code] Resultc_idc_namec_descc_001Kevinfrom JinLinc_002Sunnyfrom JinLinc_003JinChengfrom HeBeic_001Kevinfrom JinLinc_002Sunnyfrom JinLinc_003JinChengfrom HeBei 特别说明UNION ALL 对结果数据不进行去重,如果想对结果数据进行去重,传统数据库需要进行UNION操作。
(6)UNIONUNION 将两个流给合并起来,要求两个流的字段完全一致,包括字段类型、字段顺序,并其UNION 不同于UNION ALL,UNION会对结果数据去重,与关系代数的Union语义一致,如下:
SQL 示例我们简单的将customer_tab查询2次,将查询结果合并起来,如下: [mw_shl_code=sql,true]SELECT c_id, c_name, c_desc FROM customer_tab
UNION
SELECT c_id, c_name, c_desc FROM customer_tab[/mw_shl_code] 我们发现完全一样的表数据进行 UNION之后,数据是被去重的,UNION之后的数据并没有增加。 Resultc_idc_namec_descc_001Kevinfrom JinLinc_002Sunnyfrom JinLinc_003JinChengfrom HeBei 特别说明UNION 对结果数据进行去重,在实际的实现过程需要对数据进行排序操作,所以非必要去重情况请使用UNION ALL操作。
最新经典文章,欢迎关注公众号 下一篇: Apache-Flink深度解析-SQL概览(下) http://www.aboutyun.com/forum.php?mod=viewthread&tid=26909
来源:知乎 作者:王知无 原文:《Apache-Flink深度解析-SQL概览》 https://zhuanlan.zhihu.com/p/59772928
|