pig2 发表于 2018-12-10 18:59:14

彻底明白Flink系统学习9:【Flink1.7编程】数据流Transformations介绍2窗口及相关操作

本帖最后由 pig2 于 2018-12-10 19:09 编辑

问题导读

1.为何产生window窗口计算?
2.你认为什么情况下使用Window Apply?
3.Window Fold可以用来做什么?
4.window 流是否可以union和join?
5.DataStream是否可以split?


关注最新经典文章,欢迎关注公众号
http://www.aboutyun.com/data/attachment/forum/201406/15/084659qcxzzg8n59b6zejp.jpg
上一篇:
彻底明白Flink系统学习8:【Flink1.7编程基础】DataStream Transformations介绍
http://www.aboutyun.com/forum.php?mod=viewthread&tid=26445


这篇文章,主要讲windows,那么我们思考为什么会产生windows?
我们前面流式处理,一条条消息处理不行吗?可以的。不过有些场景使用窗口更加适合,比如我们想看10分钟内下单量是多少。那么这时候我们就可以使用窗口计算了。窗口计算是对流式的一个封装,在某个时间内,对这个时间段内的数据一起处理。

理解了什么是windows,我们接着继续:
1.Window
KeyedStream → WindowedStream      
可以在已经分区的KeyedStream上定义Windows。 Windows根据某些特征(例如,在最近5秒内到达的数据)对每个key中的数据进行分组。 有关窗口的说明,可参考窗口。

dataStream.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(5))); // Last 5 seconds of data
dataStream.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(5))) // Last 5 seconds of data


2.WindowAll
DataStream → AllWindowedStream
Windows可以在常规DataStream上定义。 Windows根据某些特征(例如,在最近5秒内到达的数据)对所有流事件进行分组。 有关窗口的完整说明,可参考windows。
也就是说: 针对全局的不基于某个key进行分组的window的窗口函数的实现
注意:在许多情况下,这是不是并行transformation。 所有记录将收集在windowAll operator 的一个任务中。
dataStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(5))); // Last 5 seconds of data
dataStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(5))) // Last 5 seconds of data


3.Window Apply
WindowedStream → DataStream
AllWindowedStream → DataStream
将通用功能应用于window,下面是window元素手工求和
如果是windowAll transformation,你需要替换为 AllWindowFunction
windowedStream.apply (new WindowFunction<Tuple2<String,Integer>, Integer, Tuple, Window>() {

    public void apply (Tuple tuple,
            Window window,
            Iterable<Tuple2<String, Integer>> values,
            Collector<Integer> out) throws Exception {
      int sum = 0;
      for (value t: values) {
            sum += t.f1;
      }
      out.collect (new Integer(sum));
    }
});

// applying an AllWindowFunction on non-keyed window stream
allWindowedStream.apply (new AllWindowFunction<Tuple2<String,Integer>, Integer, Window>() {
    public void apply (Window window,
            Iterable<Tuple2<String, Integer>> values,
            Collector<Integer> out) throws Exception {
      int sum = 0;
      for (value t: values) {
            sum += t.f1;
      }
      out.collect (new Integer(sum));
    }

});

windowedStream.apply { WindowFunction }

// applying an AllWindowFunction on non-keyed window stream
allWindowedStream.apply { AllWindowFunction }

4.Window Reduce
WindowedStream → DataStream
将函数reduce功能应用于窗口并返回reduce的值。
windowedStream.reduce (new ReduceFunction<Tuple2<String,Integer>>() {

    public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {
      return new Tuple2<String,Integer>(value1.f0, value1.f1 + value2.f1);
    }

});

windowedStream.reduce { _ + _ }


5.Window Fold
WindowedStream → DataStream
将功能Fold功能应用于窗口并返回folded 值。 示例函数应用于序列(1,2,3,4,5)时,将序列folded 为字符串“start-1-2-3-4-5”:
windowedStream.fold("start", new FoldFunction<Integer, String>() {
    public String fold(String current, Integer value) {
      return current + "-" + value;
    }
});
val result: DataStream =
    windowedStream.fold("start", (str, i) => { str + "-" + i })

6.windows聚合
WindowedStream → DataStream
聚合窗口的内容。 min和minBy之间的差异是min返回最小值,而minBy返回该字段中具有最小值的元素(max和maxBy相同)。
windowedStream.sum(0);
windowedStream.sum("key");
windowedStream.min(0);
windowedStream.min("key");
windowedStream.max(0);
windowedStream.max("key");
windowedStream.minBy(0);
windowedStream.minBy("key");
windowedStream.maxBy(0);
windowedStream.maxBy("key");
windowedStream.sum(0)
windowedStream.sum("key")
windowedStream.min(0)
windowedStream.min("key")
windowedStream.max(0)
windowedStream.max("key")
windowedStream.minBy(0)
windowedStream.minBy("key")
windowedStream.maxBy(0)
windowedStream.maxBy("key")

上面窗口计算完毕,接着我们介绍新的内容,流和窗口等的结合

7.Union
DataStream* → DataStream

两个或多个数据流Union操作,来创建包含来自所有流的所有元素的新流。 注意:如果将数据流与自身union,则会在结果流中每个元素获取两次。dataStream.union(otherStream1, otherStream2, ...);

dataStream.union(otherStream1, otherStream2, ...)


8.Window Join
DataStream,DataStream → DataStream      

给定的key和通用窗口Join两个数据流
dataStream.join(otherStream)
    .where(<key selector>).equalTo(<key selector>)
    .window(TumblingEventTimeWindows.of(Time.seconds(3)))
    .apply (new JoinFunction () {...});
dataStream.join(otherStream)
    .where(<key selector>).equalTo(<key selector>)
    .window(TumblingEventTimeWindows.of(Time.seconds(3)))
    .apply { ... }

9.Interval Join
KeyedStream,KeyedStream → DataStream
在给定的时间间隔内使用公共keye ,Join 两个keye流的两个元素e1和e2,以便e1.timestamp + lowerBound <= e2.timestamp <= e1.timestamp + upperBound
// this will join the two streams so that
// key1 == key2 && leftTs - 2 < rightTs < leftTs + 2
keyedStream.intervalJoin(otherKeyedStream)
    .between(Time.milliseconds(-2), Time.milliseconds(2)) // lower and upper bound
    .upperBoundExclusive(true) // optional
    .lowerBoundExclusive(true) // optional
    .process(new IntervalJoinFunction() {...});

10.Window CoGroup
DataStream,DataStream → DataStream

在给定key和通用窗口上对两个数据流进行Cogroup。
dataStream.coGroup(otherStream)
    .where(0).equalTo(1)
    .window(TumblingEventTimeWindows.of(Time.seconds(3)))
    .apply (new CoGroupFunction () {...});
dataStream.coGroup(otherStream)
    .where(0).equalTo(1)
    .window(TumblingEventTimeWindows.of(Time.seconds(3)))
    .apply {}
这里面CoGroup与join他们之间是有关联的,CoGroup可以实现datastream join。

11.Connect
DataStream,DataStream → ConnectedStreams

“Connect”两个保留类型的数据流。 Connect允许两个流之间的共享状态。
DataStream<Integer> someStream = //...
DataStream<String> otherStream = //...
ConnectedStreams<Integer, String> connectedStreams = someStream.connect(otherStream);
someStream : DataStream = ...
otherStream : DataStream = ...
val connectedStreams = someStream.connect(otherStream)

12.CoMap, CoFlatMap
ConnectedStreams → DataStream
类似于连接数据流上的map和flatMapconnectedStreams.map(new CoMapFunction<Integer, String, Boolean>() {
    @Override
    public Boolean map1(Integer value) {
      return true;
    }

    @Override
    public Boolean map2(String value) {
      return false;
    }
});
connectedStreams.flatMap(new CoFlatMapFunction<Integer, String, String>() {

   @Override
   public void flatMap1(Integer value, Collector<String> out) {
       out.collect(value.toString());
   }

   @Override
   public void flatMap2(String value, Collector<String> out) {
       for (String word: value.split(" ")) {
         out.collect(word);
       }
   }
});

connectedStreams.map(
    (_ : Int) => true,
    (_ : String) => false
)
connectedStreams.flatMap(
    (_ : Int) => true,
    (_ : String) => false
)

13.Split
DataStream → SplitStream
根据某些标准将流拆分为两个或更多个流。
SplitStream<Integer> split = someDataStream.split(new OutputSelector<Integer>() {
    @Override
    public Iterable<String> select(Integer value) {
      List<String> output = new ArrayList<String>();
      if (value % 2 == 0) {
            output.add("even");
      }
      else {
            output.add("odd");
      }
      return output;
    }
});

val split = someDataStream.split(
(num: Int) =>
    (num % 2) match {
      case 0 => List("even")
      case 1 => List("odd")
    }
)

14.Select
SplitStream → DataStream
从拆分流中select一个或多个流。
SplitStream<Integer> split;
DataStream<Integer> even = split.select("even");
DataStream<Integer> odd = split.select("odd");
DataStream<Integer> all = split.select("even","odd");
val even = split select "even"
val odd = split select "odd"
val all = split.select("even","odd")

15.Iterate
DataStream → IterativeStream → DataStream
通过将一个operator的输出重定向到某个先前的operator,在流中创建“feedback”循环。 这对于定义不断更新模型的算法特别有用。 以下代码以流开头并连续应用迭代体。 大于0的元素将被发送回feedback通道,其余元素将向下游转发。 有关完整说明,请参阅迭代。
IterativeStream<Long> iteration = initialStream.iterate();
DataStream<Long> iterationBody = iteration.map (/*do something*/);
DataStream<Long> feedback = iterationBody.filter(new FilterFunction<Long>(){
    @Override
    public boolean filter(Integer value) throws Exception {
      return value > 0;
    }
});
iteration.closeWith(feedback);
DataStream<Long> output = iterationBody.filter(new FilterFunction<Long>(){
    @Override
    public boolean filter(Integer value) throws Exception {
      return value <= 0;
    }
});
initialStream.iterate {
iteration => {
    val iterationBody = iteration.map {/*do something*/}
    (iterationBody.filter(_ > 0), iterationBody.filter(_ <= 0))
}
}

16.Extract Timestamps
DataStream → DataStream
从记录中提取时间戳,以便使用 event time 语义的窗口。
stream.assignTimestamps (new TimeStampExtractor() {...});
stream.assignTimestamps { timestampExtractor }


huangrong 发表于 2019-5-10 08:29:55

看完一节,出门上班

美丽天空 发表于 2018-12-11 23:34:09

感谢分享

YTP520YTP 发表于 2019-12-19 15:57:11

666666666666666666666666
页: [1]
查看完整版本: 彻底明白Flink系统学习9:【Flink1.7编程】数据流Transformations介绍2窗口及相关操作