|
Trident 的核心数据模型是“流”(Stream),不过与普通的拓扑不同的是,这里的流是作为一连串 batch 来处理的。流是分布在集群中的不同节点上运行的,并且对流的操作也是在流的各个 partition 上并行运行的。
Trident 中有 5 类操作:
本地分区操作是在每个分区块上独立运行的操作,其中不涉及网络数据传输。
函数函数负责接收一个输入域的集合并选择输出或者不输出 tuple。输出 tuple 的域会被添加到原始数据流的输入域中。如果一个函数不输出 tuple,那么原始的输入 tuple 就会被直接过滤掉。否则,每个输出 tuple 都会复制一份输入 tuple 。假设你有下面这样的函数:
再假设你有一个名为 “mystream” 的数据流,这个流中包含下面几个 tuple,每个 tuple 中包含有 “a”、“b”、“c” 三个域:
[1, 2, 3] [4, 1, 6] [3, 0, 8]如果你运行这段代码:
mystream.each(new Fields("b"), new MyFunction(), new Fields("d")))那么最终输出的结果 tuple 就会包含有 “a”、“b”、“c”、“d” 4 个域,就像下面这样:
[1, 2, 3, 0] [1, 2, 3, 1] [4, 1, 6, 0]
过滤器
过滤器负责判断输入的 tuple 是否需要保留。以下面的过滤器为例:
通过使用这段代码:
就可以将下面这样带有 “a”、“b”、“c” 三个域的 tuple
[1, 2, 3] [2, 1, 1] [2, 3, 4]最终转化成这样的结果 tuple:
[2, 1, 1] partitionAggregatepartitionAggregate 会在一批 tuple 的每个分区上执行一个指定的功能操作。与上面的函数不同,由 partitionAggregate发送出的 tuple 会将输入 tuple 的域替换。以下面这段代码为例:
mystream.partitionAggregate(new Fields("b"), new Sum(), new Fields("sum"))假如输入流中包含有 “a”、“b” 两个域并且有以下几个 tuple 块:
Partition 0: ["a", 1] ["b", 2] Partition 1: ["a", 3] ["c", 8] Partition 2: ["e", 1] ["d", 9] ["d", 10]经过上面的代码之后,输出就会变成带有一个名为 “sum” 的域的数据流,其中的 tuple 就是这样的:
Partition 0: [3] Partition 1: [11] Partition 2: [20]Storm 有三个用于定义聚合器的接口:CombinerAggregator,ReducerAggregator 以及 Aggregator。
这是 CombinerAggregator 接口:
CombinerAggregator 会将带有一个域的一个单独的 tuple 返回作为输出。CombinerAggregator 会在每个输入 tuple 上运行初始化函数,然后使用组合函数来组合所有输入的值。如果在某个分区中没有 tuple, CombinerAggregator 就会输出zero 方法的结果。例如,下面是 Count 的实现代码:
如果你使用 aggregate 方法来代替 partitionAggregate 方法,你就会发现 CombinerAggregator 的好处了。在这种情况下,Trident 会在发送 tuple 之前通过分区聚合操作来优化计算过程。
ReducerAggregator 的接口实现是这样的:
ReducerAggregator 会使用 init 方法来产生一个初始化的值,然后使用该值对每个输入 tuple 进行遍历,并最终生成并输出一个单独的 tuple,这个 tuple 中就包含有我们需要的计算结果值。例如,下面是将 Count 定义为 ReducerAggregator 的代码:
ReducerAggregator 同样可以用于 persistentAggregate,你会在后面看到这一点。
最常用的聚合器接口还是下面的 Aggregator 接口:
Aggregator 聚合器可以生成任意数量的 tuple,这些 tuple 也可以带有任意数量的域。聚合器可以在执行过程中的任意一点输出tuple,他们的执行过程是这样的:
下面是使用 Count 作为聚合器的代码:
有时你可能会需要同时执行多个聚合操作。这个过程叫做链式处理,可以使用下面这样的代码来实现:
这段代码会在每个分区上分别执行 Count 和 Sum 聚合器,而输出中只会包含一个带有 [“count”, “sum”] 域的单独的 tuple。
stateQuery 与 partitionPersiststateQuery 与 partitionPersist 会分别查询、更新 state 数据源。你可以参考 Trident State 文档 来了解如何使用它们。
projectionprojection 方法只会保留操作中指定的域。如果你有一个带有 [“a”, “b”, “c”, “d”] 域的数据流,通过执行这段代码:
就会使得输出数据流中只包含有 [“b”, “d”] 域。
重分区操作重分区操作会执行一个用来改变在不同的任务间分配 tuple 的方式的函数。在重分区的过程中分区的数量也可能会发生变化(例如,重分区之后的并行度就有可能会增大)。重分区会产生一定的网络数据传输。下面是重分区操作的几个函数:
Trident 使用 aggregate 方法和 persistentAggregate 方法来对数据流进行聚类操作。其中,aggregate 方法会分别对数据流中的每个 batch 进行处理,而 persistentAggregate 方法则会对数据流中的所有 batch 执行聚类处理,并将结果存入某个 state 中。
在数据流上执行 aggregate 方法会执行一个全局的聚类操作。在你使用 ReducerAggregator 或者 Aggregator 时,数据流首先会被重新分区成一个单独的分区,然后聚类函数就会在该分区上执行操作。而在你使用 CombinerAggregator 时,Trident 首先会计算每个分区的部分聚类结果,然后将这些结果重分区到一个单独的分区中,最后在网络数据传输完成之后结束这个聚类过程。CombinerAggregator 比其他的聚合器的运行效率更高,在聚类时应该尽可能使用CombinerAggregator。
下面是一个使用 aggregate 来获取一个 batch 的全局计数值的例子:
与 partitionAggregate 一样,aggregate 的聚合器也可以进行链式处理。然而,如果你在一个处理链中同时使用了CombinerAggregator 和非 CombinerAggregator,Trident 就不能对部分聚类操作进行优化了。
想要了解更多使用 persistentAggregate 的方法,可以参考 Trident State 文档 一文。
对分组数据流的操作通过对指定的域执行 partitionBy 操作,groupBy 操作可以将数据流进行重分区,使得相同的域的 tuple 分组可以聚集在一起。例如,下面是一个 groupBy 操作的示例:
如果你在分组数据流上执行聚合操作,聚合器会在每个分组(而不是整个区块)上运行。persistentAggregate 同样可以在一个分组数据里上运行,这种情况下聚合结果会存储在 MapState 中,其中的 key 就是分组的域名。
和其他操作一样,对分组数据流的聚合操作也可以以链式的方式执行。
融合(Merge)与联结(join)Trident API 的最后一部分是联结不同的数据流的操作。联结数据流最简单的方式就是将所有的数据流融合到一个流中。你可以使用 TridentTopology 的 merge 方法实现该操作,比如这样:
Trident 会将融合后的新数据流的域命名为为第一个数据流的输出域。
联结数据流的另外一种方法是使用 join。像 SQL 那样的标准 join 操作只能用于有限的输入数据集,对于无限的数据集就没有用武之地了。Trident 中的 join 只会应用于每个从 spout 中输出的小 batch。
下面是两个流的 join 操作的示例,其中一个流含有 [“key”, “val1″, “val2″] 域,另外一个流含有 [“x”, “val1″] 域:
上面的例子会使用 “key” 和 “x” 作为 join 的域来联结 stream1 和 stream2。Trident 要求先定义好新流的输出域,因为输入流的域可能会覆盖新流的域名。从 join 中输出的 tuple 中会包含:
在对不同的 spout 发送出的流进行 join 时,这些 spout 上会按照他们发送 batch 的方式进行同步处理。也就是说,一个处理中的 batch 中含有每个 spout 发送出的 tuple。
到这里你大概仍然会对如何进行窗口 join 操作感到困惑。窗口操作(包括平滑窗口、滚动窗口等 —— 译者注)主要是指将当前的 tuple 与过去若干小时时间段内的 tuple 联结起来的过程。
你可以使用 partitionPersist 和 stateQuery 来实现这个过程。过去一段时间内的 tuple 会以 join 域为关键字被保存到一个 state 源中。然后就可以使用 stateQuery 查询 join 域来实现这个“联结”(join)的过程。
转载自并发编程网 – ifeve.com本文链接地址: Apache Storm 官方文档 —— Trident API 概述