问题导读
1.为何会有Flink SQL?
2.本文哪些地方涉及Flink 1.7?
4.如何定义源(sources )和接收器(sinks)?
5.Flink SQL本文介绍了哪些sql?
6.将数据格式化为正确的格式以便进一步处理?
7.如何监控Flink sql查询
8.使用Flink SQL中的视图的作用是什么?
9.本文使用Flink sql实现了什么案例?
关注最新经典文章,欢迎关注公众号
相关资料
Flink sql入门三大新文档【2018】
http://www.aboutyun.com/forum.php?mod=viewthread&tid=26311
虽然Flink SQL最初于2016年8月与Flink 1.1.0一起发布,但最近的Flink版本增加了相当多的功能,使Flink SQL更易于使用,无需会编写Java / Scala代码。 在这篇文章中,我们希望(重新)从这些变化所带来的新角度介绍Flink SQL,同时为经验丰富的用户提供一些额外的知识。
新添加的SQL命令行(SQL CLI)可以轻松快速浏览流中的数据或静态数据(例如,在数据库或HDFS中)。 它还可用于构建功能强大的数据转换管道或分析管道。 在这篇文章中,我们想要探索当前可用的功能,而后续文章将更详细地介绍特定功能,并介绍Flink 1.7即将推出的令人兴奋的新功能,例如使用MATCH_RECOGNIZE扩展的复杂event处理和改进 基于时间的enrichment(富集) join。
在我们深入研究一些实践实例之前,我们列出了Flink SQL的一些亮点:
- Flink SQL是批处理和流处理的统一API:这允许使用相同的查询来处理历史数据和实时数据
- 支持处理时间和事件时间语义
- 支持使用嵌套的Avro和JSON数据
- 用户定义的scalar,聚合和表值(table-valued)函数
- 无需编码的SQL命令行(即没有Java / Scala编码)
- 支持各种类型的流连接
- 支持聚合,包括窗口和没有窗口
定义 Sources 和Sinks
使用Flink SQL的命令行客户端时,我们要做的第一件事就是定义源(sources )和接收器(sinks)。 否则,我们将无法读取或写入任何数据。 源和接收器在YAML配置文件中定义,以及其他配置设置。 YAML文件中的源和接收器配置类似于SQL DDL语句(Flink社区目前正在讨论对SQL DDL的支持)。 对于我们正在进行的示例,我们假设我们有一个Kafka主题(topic),其中存储了我们想要进一步处理和分析的出租车游乐设施的信息。 它的配置如下所示:
- tables:
- - name: TaxiRides
- type: source
- update-mode: append
- schema:
- - name: rideId
- type: LONG
- - name: rowTime
- type: TIMESTAMP
- rowtime:
- timestamps:
- type: "from-field"
- from: "rideTime"
- watermarks:
- type: "periodic-bounded"
- delay: "60000"
- - name: isStart
- type: BOOLEAN
- - name: lon
- type: FLOAT
- - name: lat
- type: FLOAT
- - name: taxiId
- type: LONG
- - name: driverId
- type: LONG
- - name: psgCnt
- type: INT
- connector:
- property-version: 1
- type: kafka
- version: 0.11
- topic: TaxiRides
- startup-mode: earliest-offset
- properties:
- - key: zookeeper.connect
- value: zookeeper:2181
- - key: bootstrap.servers
- value: kafka:9092
- - key: group.id
- value: testGroup
- format:
- property-version: 1
- type: json
- schema: "ROW(rideId LONG, isStart BOOLEAN, rideTime TIMESTAMP, lon FLOAT, lat FLOAT, psgCnt INT, taxiId LONG, driverId LONG)"
复制代码
在Flink SQL中,源,接收器以及介于两者之间的所有内容称为表。 在这里,我们基于包含JSON格式的事件的Kafka主题定义初始表。 我们定义Kafka配置设置,格式以及我们如何将其映射到模式,以及我们希望如何从数据中导出watermarks 。 除了JSON之外,Flink SQL还内置了对CSV和Avro格式的支持,并且还可以使用自定义格式对其进行扩展。 Flink SQL始终支持在JSON和Avro架构中处理嵌套数据。
Flink SQL的使用
现在我们讨论了源表的配置和格式,下面我们说说 Flink SQL的使用
从Flink SQL命令行客户端,我们可以列出我们定义的表:
- Flink SQL> SHOW TABLES;
- TaxiRides
- TaxiRides_Avro
复制代码
我们还可以检查任何表的schema :
- Flink SQL> DESCRIBE TaxiRides;
- root
- |-- rideId: Long
- |-- rowTime: TimeIndicatorTypeInfo(rowtime)
- |-- isStart: Boolean
- |-- lon: Float
- |-- lat: Float
- |-- taxiId: Long
- |-- driverId: Long
- |-- psgCnt: Integer
复制代码
有了这个,让我们看看我们可以用我们的表做什么。
有关配置Flink SQL以及定义源,接收器及其格式的详细信息,请参阅文档(https://ci.apache.org/projects/f ... l#environment-files)。
格式化数据
我们可能想要做的最简单的事情之一是将数据格式化为正确的格式以便进一步处理。 这可能包括:
- 在schema之间转换,例如将JSON事件流转换为Avro编码
- 用SQL语句中删除字段或将其投影
- 过滤掉我们不感兴趣的整个事件(events )
让我们看一下从架构转换开始我们将如何做到这些。 当我们想要从Kafka读取数据时,将数据转换为不同的格式,并将数据写回不同的Kafka主题以进行下游处理,我们所要做的就是定义源表(如上所述)然后定义 作为接收器的表格具有不同的格式:
- tables:
- - name: TaxiRides_Avro0
- type: sink
- update-mode: append
- schema:
- - name: rideId
- type: LONG
- - name: rowTime
- type: TIMESTAMP
- - name: isStart
- type: BOOLEAN
- - name: lon
- type: FLOAT
- - name: lat
- type: FLOAT
- - name: taxiId
- type: LONG
- - name: driverId
- type: LONG
- - name: psgCnt
- type: INT
- connector:
- property-version: 1
- type: kafka
- version: 0.11
- topic: TaxiRides_Avro
- properties:
- - key: zookeeper.connect
- value: zookeeper:2181
- - key: bootstrap.servers
- value: kafka:9092
- - key: group.id
- value: trainingGroup
- format:
- property-version: 1
- type: avro
- avro-schema: >
- {
- "type": "record",
- "name": "test",
- "fields" : [
- {"name": "rideId", "type": "long"},
- {"name": "rowTime", "type": {"type": "long", "logicalType": "timestamp-millis"}},
- {"name": "isStart", "type": "boolean"},
- {"name": "lon", "type": "float"},
- {"name": "lat", "type": "float"},
- {"name": "taxiId", "type": "long"},
- {"name": "driverId", "type": "long"},
- {"name": "psgCnt", "type": "int"}
- ]
- }
复制代码
通过我们定义的源和接收器转换数据变得如此简单:
- Flink SQL> INSERT INTO TaxiRides_Avro SELECT * FROM TaxiRides;
- [INFO] Submitting SQL update statement to the cluster...
- [INFO] Table update statement has been successfully submitted to the cluster:
- Cluster ID: StandaloneClusterId
- Job ID: ffa9109b9cad077ec83137f55ec6d1c5
- Web interface: http://jobmanager:8081
复制代码
我们的查询作为常设查询提交给Flink集群。可以通过访问http://localhost:8081来监视和控制来自Flink的WebUI的查询。
我们可以通过引入(projection)投影和(filtering)过滤来构建这个简单的模式。 如果我们只想在结果中包含某些字段,我们可以在SELECT查询中指定。 例如:
- Flink SQL> INSERT INTO TaxiRides_Avro SELECT rideIdId, taxiId, driverId FROM TaxiRides;
复制代码
这只会给我们events中的ID。 (请记住,需要调整接收器的格式才能使此查询起作用。)
基于此,我们可以做的另一件简单事情就是过滤掉整个事件。 考虑一下我们只对在某个城市发生的出租车乘坐感兴趣的情况。 事件具有lon和lat字段,分别给出事件发生的经度和纬度。 我们可以使用它们来确定事件是否发生在某个城市:
- Flink SQL> SELECT * FROM TaxiRides WHERE isInNYC(lon, lat);
复制代码
你可能会注意到,那就是isInNYC()。 这是我们在SQL客户端配置中定义的用户定义函数或UDF。 我们可以通过以下方式查看我们提供的用户功能:
- Flink SQL> SHOW FUNCTIONS;
- timeDiff
- toCoords
- isInNYC
- toAreaId
复制代码
就像在Flink SQL客户端配置文件中配置的其他内容一样:
- functions:
- - name: timeDiff
- from: class
- class: com.dataartisans.udfs.TimeDiff
- - name: isInNYC
- from: class
- class: com.dataartisans.udfs.IsInNYC
- - name: toAreaId
- from: class
- class: com.dataartisans.udfs.ToAreaId
- - name: toCoords
- from: class
- class: com.dataartisans.udfs.ToCoords
复制代码
UDF是实现特定接口并在客户端注册的Java类。 有不同类型的用户功能:(scalar )标量函数,表函数和聚合函数。 其中详细介绍了用户定义的函数,可以查看UDF文档。
使用Flink SQL中的视图构建查询
一旦我们有足够复杂的SQL查询,它们就会变得有点难以理解。 我们可以通过在Flink SQL中定义视图来缓解这种情况。 这类似于在编程语言中定义变量以给出某个名称的方式,以便以后能够重用它。 假设我们想要在早期的例子的基础上进行构建,并创建一个在给定日期之后在某个城市发生的游乐设施的视图。 我们会这样做:
- Flink SQL> CREATE VIEW TaxiRides_NYC AS SELECT * FROM TaxiRides
- WHERE isInNYC(lon, lat)
- AND rowTime >= TIMESTAMP '2013-01-01 00:00:00';
- [INFO] View has been created.
复制代码
我们可以通过以下方式找出视图:
- Flink SQL> SHOW TABLES;
- TaxiRides
- TaxiRides_Avro
- TaxiRides_NYC
复制代码
需要注意的一点是,创建视图实际上并不实例化任何常设查询或产生任何输出或中间结果。 视图只是可以重用的查询的逻辑名称,并允许更好地构建查询。 这与其他一些类似SQL的流式系统不同,在这些系统中,每个中间查询都会创建数据并使用资源。
视图是Flink 1.7的即将推出的功能,但它已经实现并合并到主分支中( master branch),这就是为什么我们已经在这里提到它。 另外,它非常有用。
基于事件时间的窗口化聚合
作为最后一步,我们希望展示一个更复杂的查询,它将我们到目前为止所解释的内容汇集在一起。 考虑一种情况,我们希望监控正在发生的游乐设施,并且需要知道某个城市某个特定区域的游乐设施数量何时超过阈值(比如说5)。 这是这样做的查询:
- SELECT
- toAreaId(lon, lat) AS area,
- TUMBLE_END(rowTime, INTERVAL '5' MINUTE) AS t,
- COUNT(*) AS c
- FROM TaxiRides_NYC
- WHERE isStart = TRUE
- GROUP BY
- toAreaId(lon, lat),
- TUMBLE(rowTime, INTERVAL '5' MINUTE)
- HAVING COUNT(*) >= 5;
复制代码
在上面的示例中,我们执行以下操作:
- 我们使用之前创建的视图,其中包含在特定日期之后发生的某个城市的事件,
- 我们过滤掉那些不是“开始事件”的事件,
- 我们使用另一个用户定义的函数将lon,lat对转换为区域id和group by,
- 我们指定我们想要有五分钟的窗口,最后
- 我们过滤掉那些计数小于5的窗口。
在现实世界的用例中,我们现在将其写入Elasticsearch接收器并使用它为仪表板或通知系统供电。留给大家思考。
总结
在这篇博文中,我们解释了如何在不编写Java代码的情况下使用Flink SQL实现简单的数据转换和数据Massaging作业。 我们还解释了如何使用视图来构建更复杂的查询并使其易于理解。 最后,我们开发了一个更复杂的查询,它结合了用户定义的函数,窗口聚合和事件时间支持。
在后续文章中,我们将提供有关如何开发和使用用户定义函数的更多内容,我们将深入了解Flink SQL的强大连接以及如何使用它们来丰富数据。 在Flink 1.7.0发布之后使用Flink SQL的数据丰富,复杂事件处理和模式检测引入强大的新增功能。
相关文章:
Apache Flink中Savepoints和Checkpoints之间的3个区别
http://www.aboutyun.com/forum.php?mod=viewthread&tid=26299
确定Apache Flink集群规模时需要考虑的6件事
http://www.aboutyun.com/forum.php?mod=viewthread&tid=25599
Apache Flink如何管理Kafka消费者offsets
http://www.aboutyun.com/forum.php?mod=viewthread&tid=25592
Apache Flink:开发经验总结
http://www.aboutyun.com/forum.php?mod=viewthread&tid=25558
Flink状态流处理:State Backends三种方式详解
http://www.aboutyun.com/forum.php?mod=viewthread&tid=26230
Flink Watermarks【水位线】详解
http://www.aboutyun.com/forum.php?mod=viewthread&tid=25615
Flink使用场景-生产环境【适合架构师、面试者】
http://www.aboutyun.com/forum.php?mod=viewthread&tid=25634
即将到来的Flink1.7中迎来新的Kafka 连接器
http://www.aboutyun.com/forum.php?mod=viewthread&tid=25610
Flink实时性、容错机制、窗口等介绍
http://www.aboutyun.com/forum.php?mod=viewthread&tid=25540
Apache Flink:详细入门
http://www.aboutyun.com/forum.php?mod=viewthread&tid=18491
阿里巴巴搜索中为何使用Flink及如何实践介绍
http://www.aboutyun.com/forum.php?mod=viewthread&tid=21047
|