分享

关于流式计算中窗口的理解问题

ighack 2019-3-28 12:36:36 发表于 疑问解答 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 4 5119
一般在流式计算中都会有一个窗口的概念(window)。
比较在flink中的
.timeWindow(Time.seconds(5))

它将无边界数据按时间划分为每5秒一个窗口
那么第次聚合计算(sum)都是计算出当前窗口的值
也就是说我计算出的是10:00:05,10:00:10,10:00:15,10:00:20,10:00:25
这5次在不同时间段内的的结果。如果我想要得到的是一个月的sum结果。我难道要将时间窗口设成一个月吗?
如果是一年呢?10年呢。

还有一种思路是。将每5秒的结果存下来做中间结果。
但如果我的数据量很大,存下来的中间结果很多。在计算的话不就成了一个批处理吗?无法实时得到结果。
而且这样计算也有问题。比如我的数据有修改,或者其他的什么。并不适合这种有中间结果在聚合的场景,该怎么做。


还有就是计算结果的不准确性。如果数据延时到达。上一个窗口都已经关闭了。计算出来的结果就漏掉了这个延时到达的数据。其结果就不准确了
flink中怎么去处理的

已有(4)人评论

跳转到指定楼层
s060403072 发表于 2019-3-28 16:45:32
楼主还是比较喜欢思考的。
从价值的角度来讲:
Flink是为流式而产生。
何为流?何为实时?
流也就是消息来到接着就处理。这就是流式处理。所以这也是实时处理。

那么为何产生窗口?
1.窗口是为了更好地聚合数据。
2.比如对于实时性没有要求那么高,我们只想看到最近5秒的数据,那么就可以使用滑动窗口。
如果想看用户登陆网站,连续操作时间内都做了哪些事情,可以使用会话窗口。

对于一个月的处理?
可以使用窗口,但是这就是失去了实时的意义,一个月的数据,可以使用离线处理,比如dataset或则MapReduce也是可以的。


回复

使用道具 举报

ighack 发表于 2019-3-29 14:00:04
如果使用离线计算的话,如spark那么就没有实时性了
我考虑过使用imply,设定好时间粒度可以多维度聚合。但这个不可以修改数据
flink里说能有stream和batch两种特性。那么batch的特性是怎么体现的呢
回复

使用道具 举报

s060403072 发表于 2019-3-29 16:38:10
ighack 发表于 2019-3-29 14:00
如果使用离线计算的话,如spark那么就没有实时性了
我考虑过使用imply,设定好时间粒度可以多维度聚合。但 ...

同一套api,比如你接受的是流数据,那直接使用datastream,如果是批处理,直接初始化dataset,相当简单。可以看看相关程序,一目了然。
[mw_shl_code=scala,true]object WordCount {
def main(args: Array[String]) {
// set up the execution environment
val env = ExecutionEnvironment.getExecutionEnvironment
// get input data
val dataset= env.fromElements("To be, or not to be,--that is the question:--",
"Whether 'tis nobler in the mind to suffer", "The slings and arrows of outrageous fortune",
"Or to take arms against a sea of troubles,")
val counts = dataset.flatMap { _.toLowerCase.split("\\W+") }
      .map { (_, (1,2)) }
      .groupBy(0)
      .sum(1)

// execute and print result
counts.print()
  }
}[/mw_shl_code]



下面为datastream
[mw_shl_code=scala,true]1 使用并行度为1的source
  public class MyNoParalleSource implements SourceFunction<Long>{
  
      private long count = 1L;
  
      private boolean isRunning = true;
  
      /**
       * 主要的方法
       * 启动一个source
       * 大部分情况下,都需要在这个run方法中实现一个循环,这样就可以循环产生数据了
       *
       * @param ctx
       * @throws Exception
       */
      @Override
      public void run(SourceContext<Long> ctx) throws Exception {
          while(isRunning){
              ctx.collect(count);
              count++;
              //每秒产生一条数据
              Thread.sleep(1000);
          }
      }
      * 取消一个cancel的时候会调用的方法
      @Override
      public void cancel() {
          isRunning = false;
      }
  }
  
  2 Main方法执行
  public class StreamingDemoWithMyNoPralalleSource {
      public static void main(String[] args) throws Exception {
      //获取Flink的运行环境
      StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      //获取数据源
      DataStreamSource<Long> text = env.addSource(new MyNoParalleSource()).setParallelism(1); //注意:针对此source,并行度只能设置为1
      DataStream<Long> num = text.map(new MapFunction<Long, Long>() {
          @Override
          public Long map(Long value) throws Exception {
              System.out.println("接收到数据:" + value);
              return value;
          }
      });

  //每2秒钟处理一次数据
  DataStream<Long> sum = num.timeWindowAll(Time.seconds(2)).sum(0);
  //打印结果
  sum.print().setParallelism(1);
  String jobName = StreamingDemoWithMyNoPralalleSource.class.getSimpleName();
  env.execute(jobName);
   }
  }   [/mw_shl_code]
回复

使用道具 举报

ighack 发表于 2019-3-29 17:15:28
明白。我本来还期待这个可以是将stream和batch混合在一起。将实时数据和离线数合在一起做聚合。如果是这样的话。我可以一边接受kafka,一边将hdfs的数据取出合在一起这样就可以实现一个月的sum。而且是实时的。
如果只是说用同一套框架。只不过是用stream特性处理流。用batch的特性处理离线数据。是一个分开的。对我来说还是达不到需求
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条