本帖最后由 pig2 于 2018-12-10 19:09 编辑
问题导读
1.为何产生window窗口计算?
2.你认为什么情况下使用Window Apply?
3.Window Fold可以用来做什么?
4.window 流是否可以union和join?
5.DataStream是否可以split?
关注最新经典文章,欢迎关注公众号
上一篇:
彻底明白Flink系统学习8:【Flink1.7编程基础】DataStream Transformations介绍
http://www.aboutyun.com/forum.php?mod=viewthread&tid=26445
这篇文章,主要讲windows,那么我们思考为什么会产生windows?
我们前面流式处理,一条条消息处理不行吗?可以的。不过有些场景使用窗口更加适合,比如我们想看10分钟内下单量是多少。那么这时候我们就可以使用窗口计算了。窗口计算是对流式的一个封装,在某个时间内,对这个时间段内的数据一起处理。
理解了什么是windows,我们接着继续:
1.Window
KeyedStream → WindowedStream
可以在已经分区的KeyedStream上定义Windows。 Windows根据某些特征(例如,在最近5秒内到达的数据)对每个key中的数据进行分组。 有关窗口的说明,可参考窗口。
[mw_shl_code=java,true]dataStream.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(5))); // Last 5 seconds of data[/mw_shl_code]
[mw_shl_code=scala,true]dataStream.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(5))) // Last 5 seconds of data
[/mw_shl_code]
2.WindowAll
DataStream → AllWindowedStream
Windows可以在常规DataStream上定义。 Windows根据某些特征(例如,在最近5秒内到达的数据)对所有流事件进行分组。 有关窗口的完整说明,可参考windows。
也就是说: 针对全局的不基于某个key进行分组的window的窗口函数的实现
注意:在许多情况下,这是不是并行transformation。 所有记录将收集在windowAll operator 的一个任务中。
[mw_shl_code=java,true]dataStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(5))); // Last 5 seconds of data[/mw_shl_code]
[mw_shl_code=scala,true]dataStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(5))) // Last 5 seconds of data
[/mw_shl_code]
3.Window Apply
WindowedStream → DataStream
AllWindowedStream → DataStream
将通用功能应用于window,下面是window元素手工求和
如果是windowAll transformation,你需要替换为 AllWindowFunction
[mw_shl_code=java,true]windowedStream.apply (new WindowFunction<Tuple2<String,Integer>, Integer, Tuple, Window>() {
public void apply (Tuple tuple,
Window window,
Iterable<Tuple2<String, Integer>> values,
Collector<Integer> out) throws Exception {
int sum = 0;
for (value t: values) {
sum += t.f1;
}
out.collect (new Integer(sum));
}
});
// applying an AllWindowFunction on non-keyed window stream
allWindowedStream.apply (new AllWindowFunction<Tuple2<String,Integer>, Integer, Window>() {
public void apply (Window window,
Iterable<Tuple2<String, Integer>> values,
Collector<Integer> out) throws Exception {
int sum = 0;
for (value t: values) {
sum += t.f1;
}
out.collect (new Integer(sum));
}
});[/mw_shl_code]
[mw_shl_code=scala,true]windowedStream.apply { WindowFunction }
// applying an AllWindowFunction on non-keyed window stream
allWindowedStream.apply { AllWindowFunction }[/mw_shl_code]
4.Window Reduce
WindowedStream → DataStream
将函数reduce功能应用于窗口并返回reduce的值。
[mw_shl_code=java,true]windowedStream.reduce (new ReduceFunction<Tuple2<String,Integer>>() {
public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {
return new Tuple2<String,Integer>(value1.f0, value1.f1 + value2.f1);
}
});[/mw_shl_code]
[mw_shl_code=scala,true]windowedStream.reduce { _ + _ }
[/mw_shl_code]
5.Window Fold
WindowedStream → DataStream
将功能Fold功能应用于窗口并返回folded 值。 示例函数应用于序列(1,2,3,4,5)时,将序列folded 为字符串“start-1-2-3-4-5”:
[mw_shl_code=java,true]windowedStream.fold("start", new FoldFunction<Integer, String>() {
public String fold(String current, Integer value) {
return current + "-" + value;
}
});[/mw_shl_code]
[mw_shl_code=scala,true]val result: DataStream[String] =
windowedStream.fold("start", (str, i) => { str + "-" + i })[/mw_shl_code]
6.windows聚合
WindowedStream → DataStream
聚合窗口的内容。 min和minBy之间的差异是min返回最小值,而minBy返回该字段中具有最小值的元素(max和maxBy相同)。
[mw_shl_code=java,true]windowedStream.sum(0);
windowedStream.sum("key");
windowedStream.min(0);
windowedStream.min("key");
windowedStream.max(0);
windowedStream.max("key");
windowedStream.minBy(0);
windowedStream.minBy("key");
windowedStream.maxBy(0);
windowedStream.maxBy("key");[/mw_shl_code]
[mw_shl_code=scala,true]windowedStream.sum(0)
windowedStream.sum("key")
windowedStream.min(0)
windowedStream.min("key")
windowedStream.max(0)
windowedStream.max("key")
windowedStream.minBy(0)
windowedStream.minBy("key")
windowedStream.maxBy(0)
windowedStream.maxBy("key")[/mw_shl_code]
上面窗口计算完毕,接着我们介绍新的内容,流和窗口等的结合
7.Union
DataStream* → DataStream
两个或多个数据流Union操作,来创建包含来自所有流的所有元素的新流。 注意:如果将数据流与自身union,则会在结果流中每个元素获取两次。[mw_shl_code=java,true]dataStream.union(otherStream1, otherStream2, ...);
[/mw_shl_code]
[mw_shl_code=scala,true]dataStream.union(otherStream1, otherStream2, ...)
[/mw_shl_code]
8.Window Join
DataStream,DataStream → DataStream
给定的key和通用窗口Join两个数据流
[mw_shl_code=java,true]dataStream.join(otherStream)
.where(<key selector>).equalTo(<key selector>)
.window(TumblingEventTimeWindows.of(Time.seconds(3)))
.apply (new JoinFunction () {...});[/mw_shl_code]
[mw_shl_code=scala,true]dataStream.join(otherStream)
.where(<key selector>).equalTo(<key selector>)
.window(TumblingEventTimeWindows.of(Time.seconds(3)))
.apply { ... }[/mw_shl_code]
9.Interval Join
KeyedStream,KeyedStream → DataStream
在给定的时间间隔内使用公共keye ,Join 两个keye流的两个元素e1和e2,以便e1.timestamp + lowerBound <= e2.timestamp <= e1.timestamp + upperBound
[mw_shl_code=java,true]// this will join the two streams so that
// key1 == key2 && leftTs - 2 < rightTs < leftTs + 2
keyedStream.intervalJoin(otherKeyedStream)
.between(Time.milliseconds(-2), Time.milliseconds(2)) // lower and upper bound
.upperBoundExclusive(true) // optional
.lowerBoundExclusive(true) // optional
.process(new IntervalJoinFunction() {...});[/mw_shl_code]
10.Window CoGroup
DataStream,DataStream → DataStream
在给定key和通用窗口上对两个数据流进行Cogroup。
[mw_shl_code=java,true]dataStream.coGroup(otherStream)
.where(0).equalTo(1)
.window(TumblingEventTimeWindows.of(Time.seconds(3)))
.apply (new CoGroupFunction () {...});[/mw_shl_code]
[mw_shl_code=scala,true]dataStream.coGroup(otherStream)
.where(0).equalTo(1)
.window(TumblingEventTimeWindows.of(Time.seconds(3)))
.apply {}[/mw_shl_code]
这里面CoGroup与join他们之间是有关联的,CoGroup可以实现datastream join。
11.Connect
DataStream,DataStream → ConnectedStreams
“Connect”两个保留类型的数据流。 Connect允许两个流之间的共享状态。
[mw_shl_code=java,true]DataStream<Integer> someStream = //...
DataStream<String> otherStream = //...
ConnectedStreams<Integer, String> connectedStreams = someStream.connect(otherStream);[/mw_shl_code]
[mw_shl_code=scala,true]someStream : DataStream[Int] = ...
otherStream : DataStream[String] = ...
val connectedStreams = someStream.connect(otherStream)[/mw_shl_code]
12.CoMap, CoFlatMap
ConnectedStreams → DataStream
类似于连接数据流上的map和flatMap[mw_shl_code=java,true]connectedStreams.map(new CoMapFunction<Integer, String, Boolean>() {
@Override
public Boolean map1(Integer value) {
return true;
}
@Override
public Boolean map2(String value) {
return false;
}
});
connectedStreams.flatMap(new CoFlatMapFunction<Integer, String, String>() {
@Override
public void flatMap1(Integer value, Collector<String> out) {
out.collect(value.toString());
}
@Override
public void flatMap2(String value, Collector<String> out) {
for (String word: value.split(" ")) {
out.collect(word);
}
}
});[/mw_shl_code]
[mw_shl_code=scala,true]connectedStreams.map(
(_ : Int) => true,
(_ : String) => false
)
connectedStreams.flatMap(
(_ : Int) => true,
(_ : String) => false
)[/mw_shl_code]
13.Split
DataStream → SplitStream
根据某些标准将流拆分为两个或更多个流。
[mw_shl_code=java,true]SplitStream<Integer> split = someDataStream.split(new OutputSelector<Integer>() {
@Override
public Iterable<String> select(Integer value) {
List<String> output = new ArrayList<String>();
if (value % 2 == 0) {
output.add("even");
}
else {
output.add("odd");
}
return output;
}
});[/mw_shl_code]
[mw_shl_code=scala,true]val split = someDataStream.split(
(num: Int) =>
(num % 2) match {
case 0 => List("even")
case 1 => List("odd")
}
)[/mw_shl_code]
14.Select
SplitStream → DataStream
从拆分流中select一个或多个流。
[mw_shl_code=java,true]SplitStream<Integer> split;
DataStream<Integer> even = split.select("even");
DataStream<Integer> odd = split.select("odd");
DataStream<Integer> all = split.select("even","odd");[/mw_shl_code]
[mw_shl_code=scala,true]val even = split select "even"
val odd = split select "odd"
val all = split.select("even","odd")[/mw_shl_code]
15.Iterate
DataStream → IterativeStream → DataStream
通过将一个operator的输出重定向到某个先前的operator,在流中创建“feedback”循环。 这对于定义不断更新模型的算法特别有用。 以下代码以流开头并连续应用迭代体。 大于0的元素将被发送回feedback通道,其余元素将向下游转发。 有关完整说明,请参阅迭代。
[mw_shl_code=java,true]IterativeStream<Long> iteration = initialStream.iterate();
DataStream<Long> iterationBody = iteration.map (/*do something*/);
DataStream<Long> feedback = iterationBody.filter(new FilterFunction<Long>(){
@Override
public boolean filter(Integer value) throws Exception {
return value > 0;
}
});
iteration.closeWith(feedback);
DataStream<Long> output = iterationBody.filter(new FilterFunction<Long>(){
@Override
public boolean filter(Integer value) throws Exception {
return value <= 0;
}
});[/mw_shl_code]
[mw_shl_code=scala,true]initialStream.iterate {
iteration => {
val iterationBody = iteration.map {/*do something*/}
(iterationBody.filter(_ > 0), iterationBody.filter(_ <= 0))
}
}[/mw_shl_code]
16.Extract Timestamps
DataStream → DataStream
从记录中提取时间戳,以便使用 event time 语义的窗口。
[mw_shl_code=java,true]stream.assignTimestamps (new TimeStampExtractor() {...});[/mw_shl_code]
[mw_shl_code=scala,true]stream.assignTimestamps { timestampExtractor }
[/mw_shl_code]
|