分享

flume 频繁产生小文件原因分析及解决办法

本帖最后由 xuanxufeng 于 2017-3-26 18:26 编辑
问题导读

1.flume滚动配置为何不起作用?
2.通过源码分析得出什么原因?
3.该如何解决flume小文件?





本人在测试hdfs的sink,发现sink端的文件滚动配置项起不到任何作用,配置如下:
[mw_shl_code=bash,true]a1.sinks.k1.type=hdfs  
a1.sinks.k1.channel=c1  
a1.sinks.k1.hdfs.useLocalTimeStamp=true  
a1.sinks.k1.hdfs.path=hdfs://192.168.11.177:9000/flume/events/%Y/%m/%d/%H/%M  
a1.sinks.k1.hdfs.filePrefix=XXX  
a1.sinks.k1.hdfs.rollInterval=60  
a1.sinks.k1.hdfs.rollSize=0  
a1.sinks.k1.hdfs.rollCount=0  
a1.sinks.k1.hdfs.idleTimeout=0  [/mw_shl_code]
这里配置的是60秒,文件滚动一次,也就每隔60秒,会新产生一个文件【前提,flume的source端有数据来】
这里注意

[mw_shl_code=bash,true]useLocalTimeStamp  
[/mw_shl_code]

这个属性的目的就是相当于时间戳的拦截器,否则%Y 等等这些东西都识别不了
要么用上面这个属性,要么用时间戳拦截器。

但是当我启动flume的时候,运行十几秒,不断写入数据,发现hdfs端频繁的产生文件,每隔几秒就有新文件产生
而且在flume的日志输出可以频繁看到这句:
[WARN] Block Under-replication detected. Rotating file.
只要有这句,就会产生一个新的文件
意思就是检测到复制块正在滚动文件,结合源码看下:

[mw_shl_code=java,true]private boolean shouldRotate() {  
    boolean doRotate = false;  
  
    if (writer.isUnderReplicated()) {  
      this.isUnderReplicated = true;  
      doRotate = true;  
    } else {  
      this.isUnderReplicated = false;  
    }  
  
    if ((rollCount > 0) && (rollCount <= eventCounter)) {  
      LOG.debug("rolling: rollCount: {}, events: {}", rollCount, eventCounter);  
      doRotate = true;  
    }  
  
    if ((rollSize > 0) && (rollSize <= processSize)) {  
      LOG.debug("rolling: rollSize: {}, bytes: {}", rollSize, processSize);  
      doRotate = true;  
    }  
  
    return doRotate;  
  }  [/mw_shl_code]
这是判断是否滚动文件,但是这里面的第一判断条件是判断是否当前的HDFSWriter正在复制块

[mw_shl_code=java,true]public boolean isUnderReplicated() {  
    try {  
      int numBlocks = getNumCurrentReplicas();  
      if (numBlocks == -1) {  
        return false;  
      }  
      int desiredBlocks;  
      if (configuredMinReplicas != null) {  
        desiredBlocks = configuredMinReplicas;  
      } else {  
        desiredBlocks = getFsDesiredReplication();  
      }  
      return numBlocks < desiredBlocks;  
    } catch (IllegalAccessException e) {  
      logger.error("Unexpected error while checking replication factor", e);  
    } catch (InvocationTargetException e) {  
      logger.error("Unexpected error while checking replication factor", e);  
    } catch (IllegalArgumentException e) {  
      logger.error("Unexpected error while checking replication factor", e);  
    }  
    return false;  
  }  [/mw_shl_code]
通过读取的配置复制块数量和当前正在复制的块比较,判断是否正在被复制

[mw_shl_code=java,true]if (shouldRotate()) {  
      boolean doRotate = true;  
  
      if (isUnderReplicated) {  
        if (maxConsecUnderReplRotations > 0 &&  
            consecutiveUnderReplRotateCount >= maxConsecUnderReplRotations) {  
          doRotate = false;  
          if (consecutiveUnderReplRotateCount == maxConsecUnderReplRotations) {  
            LOG.error("Hit max consecutive under-replication rotations ({}); " +  
                "will not continue rolling files under this path due to " +  
                "under-replication", maxConsecUnderReplRotations);  
          }  
        } else {  
          LOG.warn("Block Under-replication detected. Rotating file.");  
        }  
        consecutiveUnderReplRotateCount++;  
      } else {  
        consecutiveUnderReplRotateCount = 0;  
      }  [/mw_shl_code]

以上方法,入口是shouldRotate()方法,也就是如果你配置了rollcount,rollsize大于0,会按照你的配置来滚动的,但是在入口进来后,发现,又去判断了是否有块在复制;

里面就读取了一个固定变量maxConsecUnderReplRotations=30,也就是正在复制的块,最多之能滚动出30个文件,如果超过了30次,该数据块如果还在复制中,那么数据也不会滚动了,doRotate=false,不会滚动了,所以有的人发现自己一旦运行一段时间,会出现30个文件
再结合上面的源码看一下:
如果你配置了10秒滚动一次,写了2秒,恰好这时候该文件内容所在的块在复制中,那么虽然没到10秒,依然会给你滚动文件的,文件大小,事件数量的配置同理了。

为了解决上述问题,我们只要让程序感知不到写的文件所在块正在复制就行了,怎么做呢??
只要让isUnderReplicated()方法始终返回false就行了
该方法是通过当前正在被复制的块和配置中读取的复制块数量比较的,我们能改的就只有配置项中复制块的数量,而官方给出的flume配置项中有该项

hdfs.minBlockReplicas

Specify minimum number of replicas per HDFS block. If not specified, it comes from the default Hadoop config in the classpath.
默认读的是hadoop中的dfs.replication属性,该属性默认值是3
这里我们也不去该hadoop中的配置,在flume中添加上述属性为1即可
配置如下:

[mw_shl_code=bash,true]a1.sinks.k1.type=hdfs  
a1.sinks.k1.channel=c1  
a1.sinks.k1.hdfs.useLocalTimeStamp=true  
a1.sinks.k1.hdfs.path=hdfs://192.168.11.177:9000/flume/events/%Y/%m/%d/%H/%M  
a1.sinks.k1.hdfs.filePrefix=cmcc  
a1.sinks.k1.hdfs.minBlockReplicas=1  
#a1.sinks.k1.hdfs.fileType=DataStream  
#a1.sinks.k1.hdfs.writeFormat=Text  
a1.sinks.k1.hdfs.rollInterval=60  
a1.sinks.k1.hdfs.rollSize=0  
a1.sinks.k1.hdfs.rollCount=0  
a1.sinks.k1.hdfs.idleTimeout=0  [/mw_shl_code]
这样程序就永远不会因为文件所在块的复制而滚动文件了,只会根据你的配置项来滚动文件了,试试吧!!

推荐参考
flume的hdfs.minBlockReplicas参数的作用
http://www.aboutyun.com/forum.php?mod=viewthread&tid=21364



来自:csdn
作者:chiweitree


本帖被以下淘专辑推荐:

已有(8)人评论

跳转到指定楼层
Mr.k 发表于 2017-3-27 09:18:50
好文章学习了
回复

使用道具 举报

是饭饭 发表于 2017-3-27 10:22:39
好文章,学习了
回复

使用道具 举报

Mr.k 发表于 2017-6-30 17:49:52
多谢楼主分享
回复

使用道具 举报

nevermind 发表于 2018-3-19 15:52:11
设置了这个也没有用啊
回复

使用道具 举报

langke93 发表于 2018-3-20 07:42:07
nevermind 发表于 2018-3-19 15:52
设置了这个也没有用啊

如何设置的,flume是哪个版本,很多时候可能跟自己的操作也有关系
回复

使用道具 举报

jinwensc 发表于 2018-5-5 18:04:32
还要注意的是压缩格式问题
文件大小是压缩前的,我测试的数据压缩后很小,让我一直以为是配置问题造成小文件
rollInterval好像也不管用,每一秒都在产生文件,用roll比较好
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条