分享

彻底明白Flink系统学习29-1:【Flink1.7】流概念之模式检测

pig2 2019-1-28 20:07:23 发表于 连载型 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 2 7413
问题导读

1.Flink事件流如何匹配?
2.MATCH_RECOGNIZE子句的作用是什么?
3.如何才能使用MATCH_RECOGNIZE子句?
4.MATCH_RECOGNIZE包含那些子句?
5.事件顺序允许哪些时间搜索模式?
6.Greedy 和Reluctant量词的含义是什么?



搜索一组事件模式是一种常见的用例,尤其是在数据流的情况下。 Flink附带了一个复杂事件处理(CEP)库,允许在事件流中进行模式检测。 此外,Flink的SQL API提供了一种表达查询的关系方式,该方法具有大量内置函数和基于规则的优化,可以直接使用。

2016年12月,国际标准化组织(ISO)发布了SQL标准的新版本,其中包括SQL中的行模式识别(ISO / IEC TR 19075-5:2016)。 它允许Flink使用MATCH_RECOGNIZE子句整合CEP和SQL API,以便在SQL中进行复杂事件处理。

MATCH_RECOGNIZE子句启用以下任务:
  • 数据的逻辑分区和排序使用PARTITION BY和ORDER BY子句。
  • 定义row行模式匹配搜索使用PATTERN子句。这些匹配使用的语法类似正则表达式
  • 行模式变量的逻辑组件在DEFINE子句中指定。
  • 在MEASURES子句中定义measures,measures是SQL查询的其他部分中可用的表达式。


以下示例说明了基本模式识别的语法:
[mw_shl_code=sql,true]SELECT T.aid, T.bid, T.cid
FROM MyTable
MATCH_RECOGNIZE (
  PARTITION BY userid
  ORDER BY proctime
  MEASURES
    A.id AS aid,
    B.id AS bid,
    C.id AS cid
  PATTERN (A B C)
  DEFINE
    A AS name = 'a',
    B AS name = 'b',
    C AS name = 'c'
) AS T[/mw_shl_code]
本文将更详细地解释每个关键字,并将说明更复杂的示例。

注意:Flink实现MATCH_RECOGNIZE子句是完整标准的一个子集。 仅支持以下部分功能。 由于开发仍处于早期阶段,同时了解已知的局限性。

介绍和实例

安装指南
模式识别功能在内部使用Apache Flink的CEP库。 为了能够使用MATCH_RECOGNIZE子句,需要将库作为依赖项添加到Maven项目中。
[mw_shl_code=xml,true]<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-cep_2.11</artifactId>
  <version>1.7.0</version>
</dependency>[/mw_shl_code]
或者,也可以将依赖项添加到集群类路径

如果要在SQL客户端中使用MATCH_RECOGNIZE子句,不必执行任何操作,默认情况下包含所有依赖项。

SQL语义
每个MATCH_RECOGNIZE查询都包含以下子句:
  • PARTITION BY  - 定义表的逻辑分区; 类似于GROUP BY操作。
  • ORDER BY  - 指定如何排序行; 这是必不可少的,因为模式取决于排序。
  • MEASURES - 定义子句的输出; 类似于SELECT子句。
  • ONE ROW PER MATCH  - 输出模式,定义每个匹配应生成多少行。
  • AFTER MATCH SKIP  - 指定下一个匹配的开始位置; 这也是一种控制单个事件属于多少个不同匹配的方法。
  • PATTERN  - 允许使用类似正则表达式的语法构造将要搜索的模式。
  • DEFINE  - 此部分定义(匹配)模式变量必须满足的条件。

注意目前,MATCH_RECOGNIZE子句只能应用于追加表。 此外,它总是生成一个追加表。

例子:
对于我们的示例,我们假设已经注册了表格Ticker。 该表包含特定时间点的股票价格。

该表具有以下schema:
[mw_shl_code=sql,true]Ticker
     |-- symbol: String                           # symbol of the stock
     |-- price: Long                              # price of the stock
     |-- tax: Long                                # tax liability of the stock
     |-- rowtime: TimeIndicatorTypeInfo(rowtime)  # point in time when the change to those values happened[/mw_shl_code]
【评注:从上面我们可以看到schema的定义】

为简化起见,我们仅考虑单个股票ACME的输入数据。 ticker看起来类似于下表,其中连续追加行。
[mw_shl_code=sql,true]symbol         rowtime         price    tax
======  ====================  ======= =======
'ACME'  '01-Apr-11 10:00:00'   12      1
'ACME'  '01-Apr-11 10:00:01'   17      2
'ACME'  '01-Apr-11 10:00:02'   19      1
'ACME'  '01-Apr-11 10:00:03'   21      3
'ACME'  '01-Apr-11 10:00:04'   25      2
'ACME'  '01-Apr-11 10:00:05'   18      1
'ACME'  '01-Apr-11 10:00:06'   15      1
'ACME'  '01-Apr-11 10:00:07'   14      2
'ACME'  '01-Apr-11 10:00:08'   24      2
'ACME'  '01-Apr-11 10:00:09'   25      2
'ACME'  '01-Apr-11 10:00:10'   19      1[/mw_shl_code]
现在的任务是找到单个股票价格不断下降的时期。 为此,可以编写如下查询:
[mw_shl_code=sql,true]SELECT *
FROM Ticker
MATCH_RECOGNIZE (
    PARTITION BY symbol
    ORDER BY rowtime
    MEASURES
        START_ROW.rowtime AS start_tstamp,
        LAST(PRICE_DOWN.rowtime) AS bottom_tstamp,
        LAST(PRICE_UP.rowtime) AS end_tstamp
    ONE ROW PER MATCH
    AFTER MATCH SKIP TO LAST PRICE_UP
    PATTERN (START_ROW PRICE_DOWN+ PRICE_UP)
    DEFINE
        PRICE_DOWN AS
            (LAST(PRICE_DOWN.price, 1) IS NULL AND PRICE_DOWN.price < START_ROW.price) OR
                PRICE_DOWN.price < LAST(PRICE_DOWN.price, 1),
        PRICE_UP AS
            PRICE_UP.price > LAST(PRICE_DOWN.price, 1)
    ) MR;[/mw_shl_code]
查询按symbol列对Ticker表进行分区,并按rowtime属性对其进行排序。

PATTERN子句指定模式,使用开始事件START_ROW,该模式后跟一个或多个PRICE_DOWN事件,并以PRICE_UP事件结束。 如果可以找到这样的模式,则将在最后的PRICE_UP事件中寻找下一个模式匹配,如AFTER MATCH SKIP TO LAST子句所示。

DEFINE子句需要指定条件满足PRICE_DOWN 和PRICE_UP 事件。虽然START_ROW模式变量不存在,但它具有一个隐式条件,该条件总是被评估为TRUE。

模式变量PRICE_DOWN定义为具有price的行,它小于最后一行的price,则满足PRICE_DOWN条件。 对于初始情况或当没有满足PRICE_DOWN条件的最后一行时,行的price应小于模式中前一行的价格(由START_ROW引用)。

模式变量PRICE_UP定义为price行,它大于最后一行的price,则满足PRICE_DOWN条件。

此查询为每个期间生成一个汇总行,其中股票的价格不断下降。
输出行由查询中MEASURES部分定义。 输出行数由ONE ROW PER MATCH输出模式定义。
[mw_shl_code=bash,true] symbol       start_tstamp       bottom_tstamp         end_tstamp
=========  ==================  ==================  ==================
ACME       01-APR-11 10:00:04  01-APR-11 10:00:07  01-APR-11 10:00:08[/mw_shl_code]
结果行描述了从01-APR-11 10:00:04开始的价格下跌期,并在01-APR-11 10:00:07达到最低价格,在01-APR-11 10:00再次上涨:08。

分区
可以在分区数据中查找模式,例如,单个股票或特定用户的趋势。 这可以使用PARTITION BY子句表示。 该子句类似于使用GROUP BY进行聚合。

注意:注意强烈建议对传入数据进行分区,否则MATCH_RECOGNIZE子句将被转换为非并行operator以确保全局排序。

事件顺序
Apache Flink允许根据时间搜索模式; 处理时间或事件时间。

事件时间的情况下,事件会被排序,在传入内部匹配状态机之前。因此,无论行追加到表的顺序如何,生成的输出都是正确的。按照每行中包含的时间指定的顺序的评估模式。

MATCH_RECOGNIZE子句假定时间属性具有升序排序作为ORDER BY子句的第一个参数。

对于示例Ticker表,定义如ORDER BY rowtime 升序,price 降序有效,但ORDER BY price, rowtime 或则ORDER BY rowtime 降序, price 升序无效

Define 和Measures
DEFINE和MEASURES关键字与简单SQL查询中的WHERE和SELECT子句具有相似的含义。

MEASURES子句定义将包含在匹配模式的输出中的内容。 它可以投影列并定义表达式以进行评估。 生成的行数取决于输出模式设置。

DEFINE子句指定行必须满足的条件才能被分类到相应的模式变量。 如果未为模式变量定义条件,则将使用默认条件,该条件对于每一行的计算结果为true。

有关可在这些子句中使用的表达式的更详细说明,下面详细介绍(模式导航)。


定义模式
MATCH_RECOGNIZE子句允许用户使用功能强大且富有表现力的语法搜索事件流中的模式,该语法与广泛使用的正则表达式语法类似。

每个模式都是由基本构建块构建的,称为模式变量,可以应用operators(量词和其他修饰符)。 整个模式必须括在括号中。

示例模式可能如下所示:
[mw_shl_code=bash,true]PATTERN (A B+ C* D)
[/mw_shl_code]

可以使用以下运算符:
连接 - 像(A B)这样的模式意味着A和B之间的连续性是严格的。因此,不存在未映射到A或B之间的行。
量词 - 修改可以映射到模式变量的行数。
*  -  0行或更多行
+  -  1行或更多行
?  -  0或1行
{n}  - 正好是n行(n> 0)
{n,}  -  n行或更多行(n≥0)
{n,m}  -  n和m(含)行之间(0≤n≤m,0 <m)
{,m}  - 介于0和m(含)行之间(m> 0)

注意不支持可能产生空匹配的模式。 这种模式的示例是PATTERN(A *),PATTERN(A?B *),PATTERN(A {0,} B {0,} C *)等。
Greedy 和Reluctant量词


【说明:这里目前还有待验证】
每个量词可以是Greedy (贪婪)的(默认行为)或Reluctant(不情愿)的。 Greedy (贪心量词)试图匹配尽可能多的行,而Reluctant(不情愿的)量词试图尽可能少地匹配。

为了说明差异,可以使用查询查看以下示例,其中将Greedy(贪心)量词应用于B变量:
[mw_shl_code=sql,true]SELECT *
FROM Ticker
    MATCH_RECOGNIZE(
        PARTITION BY symbol
        ORDER BY rowtime
        MEASURES
            C.price AS lastPrice
        ONE ROW PER MATCH
        AFTER MATCH SKIP PAST LAST ROW
        PATTERN (A B* C)
        DEFINE
            A AS A.price > 10,
            B AS B.price < 15,
            C AS C.price > 12
    )[/mw_shl_code]

鉴于我们有以下输入:
[mw_shl_code=bash,true] symbol  tax   price          rowtime
======= ===== ======== =====================
XYZ     1     10       2018-09-17 10:00:02
XYZ     2     11       2018-09-17 10:00:03
XYZ     1     12       2018-09-17 10:00:04
XYZ     2     13       2018-09-17 10:00:05
XYZ     1     14       2018-09-17 10:00:06
XYZ     2     16       2018-09-17 10:00:07[/mw_shl_code]

上面的模式将产生以下输出:
[mw_shl_code=bash,true] symbol   lastPrice
======== ===========
XYZ      16[/mw_shl_code]

将B *修改为B *?的同一查询,这意味着B *应该是Reluctant(不情愿的),将产生:
[mw_shl_code=bash,true] symbol   lastPrice
======== ===========
XYZ      13
XYZ      16[/mw_shl_code]

模式变量B仅匹配价格为12的行,而不是吞噬价格为12,13和14的行。

注意不可能将(greedy)贪婪量词用于模式的最后一个变量。 因此,不允许像(A B *)那样的模式。 通过引入具有B的否定条件的人工状态(例如C),可以很容易地解决这个问题。因此,可以使用如下查询:
[mw_shl_code=bash,true]PATTERN (A B* C)
DEFINE
    A AS condA(),
    B AS condB(),
    C AS NOT condB()[/mw_shl_code]
注意目前不支持可选的不情愿量词(A ??或A {0,1}?)。


最新经典文章,欢迎关注公众号




本帖被以下淘专辑推荐:

已有(2)人评论

跳转到指定楼层
jiangzi 发表于 2019-1-30 00:02:48
【Flink1.7】流概念之模式检测~
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条