分享

Flink系统学习32-4:Table API详解:窗口定义详解

pig2 2019-3-6 11:58:52 发表于 连载型 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 0 6639
问题导读

1.翻滚窗口如何定义时间长度?
2.关键字on一般是定义什么?
3.滑动窗口都是如何定义长度、间隔?
4.会话窗口间隔如何定义的?

上一篇:
Flink系统学习32-3:Table API详解
http://www.aboutyun.com/forum.php?mod=viewthread&tid=26797

支持的窗口定义如下所示。
Tumble (翻滚窗口)
翻滚窗口将行分配给固定长度的非重叠连续窗口。 例如,5分钟的翻滚窗口以5分钟为间隔对行进行分组。 可以在事件时间,处理时间或行数上定义翻滚窗口。使用Tumble类定义翻滚窗口如下:

方法
描述
over  定义窗口的长度,可以是时间间隔也可以是行数间隔。【评注:我们知道这里over主要起到定义的作用】
on时间属性为组(时间间隔)或排序(行计数)。 对于批处理查询,这可能是任何Long或Timestamp属性。 对于流式查询,这必须是声明的事件时间或处理时间属性。
as为窗口指定别名。 别名用于引用以下groupBy()子句中的窗口,并可选择在select()子句中选择窗口属性,如window start,end或rowtime timestamp。

[mw_shl_code=scala,true]// Tumbling Event-time Window
.window(Tumble over 10.minutes on 'rowtime as 'w)

// Tumbling Processing-time Window (assuming a processing-time attribute "proctime")
.window(Tumble over 10.minutes on 'proctime as 'w)

// Tumbling Row-count Window (assuming a processing-time attribute "proctime")
.window(Tumble over 10.rows on 'proctime as 'w)[/mw_shl_code]
[mw_shl_code=java,true]// Tumbling Event-time Window
.window(Tumble.over("10.minutes").on("rowtime").as("w"));

// Tumbling Processing-time Window (assuming a processing-time attribute "proctime")
.window(Tumble.over("10.minutes").on("proctime").as("w"));

// Tumbling Row-count Window (assuming a processing-time attribute "proctime")
.window(Tumble.over("10.rows").on("proctime").as("w"));[/mw_shl_code]

Slide (滑动窗口)
滑动窗口具有固定大小,并按指定的滑动间隔滑动。 如果滑动间隔小于窗口大小,则滑动窗口重叠。 因此,可以将行分配给多个窗口。 例如,15分钟大小和5分钟滑动间隔的滑动窗口将每行分配给3个不同的15分钟大小的窗口,这些窗口以5分钟的间隔进行调用。 可以在事件时间,处理时间或行数上定义滑动窗口。

使用Slide类定义滑动窗口如下:

方法
描述
over定义窗口的长度,可以是时间或行计数间隔。
every定义滑动间隔,可以是时间间隔也可以是行数。 滑动间隔必须与大小间隔的类型相同。
on时间属性为组(时间间隔)或排序(行计数)。 对于批处理查询,这可能是任何Long或Timestamp属性。 对于流式查询,这必须是声明的事件时间或处理时间属性
as为窗口指定别名。 别名用于引用以下groupBy()子句中的窗口,并可选择在select()子句中选择窗口属性,如window start,end或rowtime timestamp。

[mw_shl_code=java,true]// Sliding Event-time Window
.window(Slide.over("10.minutes").every("5.minutes").on("rowtime").as("w"));

// Sliding Processing-time window (assuming a processing-time attribute "proctime")
.window(Slide.over("10.minutes").every("5.minutes").on("proctime").as("w"));

// Sliding Row-count window (assuming a processing-time attribute "proctime")
.window(Slide.over("10.rows").every("5.rows").on("proctime").as("w"));[/mw_shl_code]
[mw_shl_code=Scala,true]// Sliding Event-time Window
.window(Slide over 10.minutes every 5.minutes on 'rowtime as 'w)

// Sliding Processing-time window (assuming a processing-time attribute "proctime")
.window(Slide over 10.minutes every 5.minutes on 'proctime as 'w)

// Sliding Row-count window (assuming a processing-time attribute "proctime")
.window(Slide over 10.rows every 5.rows on 'proctime as 'w)[/mw_shl_code]

Session (会话窗口)
会话窗口没有固定的大小,但是它们的边界由不活动的间隔定义,即,如果在定义的间隙期间没有出现事件,则会话窗口关闭。 例如,在30分钟不活动后观察到一行时,会有一个30分钟间隙的会话窗口(否则该行将被添加到现有窗口中),如果在30分钟内未添加任何行,则会关闭。 会话窗口可以在事件时间或处理时间上工作。

使用Session类定义会话窗口,如下所示:

方法
描述
withGap将两个窗口之间的间隔定义为时间间隔。
on时间属性为组(时间间隔)或排序(行计数)。 对于批处理查询,这可能是任何Long或Timestamp属性。 对于流式查询,这必须是声明的事件时间或处理时间属性。
as为窗口指定别名。 别名用于引用以下groupBy()子句中的窗口,并可选择在select()子句中选择窗口属性,如window start,end或rowtime timestamp。

[mw_shl_code=scala,true]// Session Event-time Window
.window(Session withGap 10.minutes on 'rowtime as 'w)

// Session Processing-time Window (assuming a processing-time attribute "proctime")
.window(Session withGap 10.minutes on 'proctime as 'w)[/mw_shl_code]

[mw_shl_code=java,true]// Session Event-time Window
.window(Session.withGap("10.minutes").on("rowtime").as("w"));

// Session Processing-time Window (assuming a processing-time attribute "proctime")
.window(Session.withGap("10.minutes").on("proctime").as("w"));[/mw_shl_code]



最新经典文章,欢迎关注公众号



没找到任何评论,期待你打破沉寂

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条