问题导读
1.翻滚窗口如何定义时间长度?
2.关键字on一般是定义什么?
3.滑动窗口都是如何定义长度、间隔?
4.会话窗口间隔如何定义的?
上一篇:
Flink系统学习32-3:Table API详解
http://www.aboutyun.com/forum.php?mod=viewthread&tid=26797
支持的窗口定义如下所示。
Tumble (翻滚窗口)
翻滚窗口将行分配给固定长度的非重叠连续窗口。 例如,5分钟的翻滚窗口以5分钟为间隔对行进行分组。 可以在事件时间,处理时间或行数上定义翻滚窗口。使用Tumble类定义翻滚窗口如下:
方法 | 描述 | over | 定义窗口的长度,可以是时间间隔也可以是行数间隔。【评注:我们知道这里over主要起到定义的作用】 | on | 时间属性为组(时间间隔)或排序(行计数)。 对于批处理查询,这可能是任何Long或Timestamp属性。 对于流式查询,这必须是声明的事件时间或处理时间属性。 | as | 为窗口指定别名。 别名用于引用以下groupBy()子句中的窗口,并可选择在select()子句中选择窗口属性,如window start,end或rowtime timestamp。 |
[mw_shl_code=scala,true]// Tumbling Event-time Window
.window(Tumble over 10.minutes on 'rowtime as 'w)
// Tumbling Processing-time Window (assuming a processing-time attribute "proctime")
.window(Tumble over 10.minutes on 'proctime as 'w)
// Tumbling Row-count Window (assuming a processing-time attribute "proctime")
.window(Tumble over 10.rows on 'proctime as 'w)[/mw_shl_code]
[mw_shl_code=java,true]// Tumbling Event-time Window
.window(Tumble.over("10.minutes").on("rowtime").as("w"));
// Tumbling Processing-time Window (assuming a processing-time attribute "proctime")
.window(Tumble.over("10.minutes").on("proctime").as("w"));
// Tumbling Row-count Window (assuming a processing-time attribute "proctime")
.window(Tumble.over("10.rows").on("proctime").as("w"));[/mw_shl_code]
Slide (滑动窗口)
滑动窗口具有固定大小,并按指定的滑动间隔滑动。 如果滑动间隔小于窗口大小,则滑动窗口重叠。 因此,可以将行分配给多个窗口。 例如,15分钟大小和5分钟滑动间隔的滑动窗口将每行分配给3个不同的15分钟大小的窗口,这些窗口以5分钟的间隔进行调用。 可以在事件时间,处理时间或行数上定义滑动窗口。
使用Slide类定义滑动窗口如下:
方法 | 描述 | over | 定义窗口的长度,可以是时间或行计数间隔。 | every | 定义滑动间隔,可以是时间间隔也可以是行数。 滑动间隔必须与大小间隔的类型相同。 | on | 时间属性为组(时间间隔)或排序(行计数)。 对于批处理查询,这可能是任何Long或Timestamp属性。 对于流式查询,这必须是声明的事件时间或处理时间属性。 | as | 为窗口指定别名。 别名用于引用以下groupBy()子句中的窗口,并可选择在select()子句中选择窗口属性,如window start,end或rowtime timestamp。 |
[mw_shl_code=java,true]// Sliding Event-time Window
.window(Slide.over("10.minutes").every("5.minutes").on("rowtime").as("w"));
// Sliding Processing-time window (assuming a processing-time attribute "proctime")
.window(Slide.over("10.minutes").every("5.minutes").on("proctime").as("w"));
// Sliding Row-count window (assuming a processing-time attribute "proctime")
.window(Slide.over("10.rows").every("5.rows").on("proctime").as("w"));[/mw_shl_code]
[mw_shl_code=Scala,true]// Sliding Event-time Window
.window(Slide over 10.minutes every 5.minutes on 'rowtime as 'w)
// Sliding Processing-time window (assuming a processing-time attribute "proctime")
.window(Slide over 10.minutes every 5.minutes on 'proctime as 'w)
// Sliding Row-count window (assuming a processing-time attribute "proctime")
.window(Slide over 10.rows every 5.rows on 'proctime as 'w)[/mw_shl_code]
Session (会话窗口)
会话窗口没有固定的大小,但是它们的边界由不活动的间隔定义,即,如果在定义的间隙期间没有出现事件,则会话窗口关闭。 例如,在30分钟不活动后观察到一行时,会有一个30分钟间隙的会话窗口(否则该行将被添加到现有窗口中),如果在30分钟内未添加任何行,则会关闭。 会话窗口可以在事件时间或处理时间上工作。
使用Session类定义会话窗口,如下所示:
方法 | 描述 | withGap | 将两个窗口之间的间隔定义为时间间隔。 | on | 时间属性为组(时间间隔)或排序(行计数)。 对于批处理查询,这可能是任何Long或Timestamp属性。 对于流式查询,这必须是声明的事件时间或处理时间属性。 | as | 为窗口指定别名。 别名用于引用以下groupBy()子句中的窗口,并可选择在select()子句中选择窗口属性,如window start,end或rowtime timestamp。 |
[mw_shl_code=scala,true]// Session Event-time Window
.window(Session withGap 10.minutes on 'rowtime as 'w)
// Session Processing-time Window (assuming a processing-time attribute "proctime")
.window(Session withGap 10.minutes on 'proctime as 'w)[/mw_shl_code]
[mw_shl_code=java,true]// Session Event-time Window
.window(Session.withGap("10.minutes").on("rowtime").as("w"));
// Session Processing-time Window (assuming a processing-time attribute "proctime")
.window(Session.withGap("10.minutes").on("proctime").as("w"));[/mw_shl_code]
最新经典文章,欢迎关注公众号
|
|