分享

Flume-ng-1.3.0 spooling source的方式增加了对目录的递归检测的支持

问题导读
1、如何修改修改getNextFile方法?
2、SpoolingFileLineReader类怎样增加方法?




原有的spooling source的方式只支持一级目录的文件变动的检测,修改了一下源码支持对目录的递归检测,可以支持多级目录下的文件变动
SpoolingFileLineReader类增加以下方法
  1. /**
  2.   * add by yangbutao
  3.   *
  4.   * @param files
  5.   * @param dir
  6.   * @param filter
  7.   */
  8. private void listDirFiles(List<File> files, File dir, FileFilter filter) {
  9.   File[] childs = dir.listFiles(filter);// 列出目录下所有的文件
  10.   for (int i = 0; i < childs.length; i++) {
  11.    if (childs[i].isFile()) {
  12.     files.add(childs[i]);
  13.    } else {
  14.     if (childs[i].isDirectory()) {
  15.      listDirFiles(files, childs[i], filter);// 如果是个目录,就继续递归列出其下面的文件.
  16.     }
  17.    }
  18.   }
  19. }
复制代码



修改getNextFile方法
  1. private Optional<FileInfo> getNextFile() {
  2.   /* Filter to exclude finished or hidden files */
  3.   FileFilter filter = new FileFilter() {
  4.    public boolean accept(File pathName) {
  5.     if ((pathName.getName().endsWith(completedSuffix))
  6.       || (pathName.getName().startsWith("."))) {
  7.      return false;
  8.     }
  9.     return true;
  10.    }
  11.   };
  12. // List<File> candidateFiles =Arrays.asList(directory.listFiles(filter));//注释掉原有的
  13.   List<File> candidateFiles = new ArrayList<File>();//新增的
  14.   listDirFiles(candidateFiles, directory, filter);//新增的
复制代码




已有(9)人评论

跳转到指定楼层
lbwahoo 发表于 2014-7-17 21:29:58
回复

使用道具 举报

semion 发表于 2014-9-5 17:04:52
flume-ng 1.5 支持这样改吗? 可以实现监控目录下一直变化的文件吗
回复

使用道具 举报

vanking 发表于 2015-1-22 10:52:36
楼主对于tail -F 获取增量日志文件的日志信息可能会造成数据丢失问题有声么优化方案没?
回复

使用道具 举报

小南3707 发表于 2015-1-26 09:51:27
vanking 发表于 2015-1-22 10:52
楼主对于tail -F 获取增量日志文件的日志信息可能会造成数据丢失问题有声么优化方案没?

你有什么方案么?
回复

使用道具 举报

为了明天time 发表于 2015-9-22 09:18:16
回复

使用道具 举报

墨小黑 发表于 2017-5-3 22:47:58
我用的spoolSources、fileChannels、hdfsinks的agent来传输文件,但是在hdfs里面得到的文件所有文件名都带有.138948494593这样的时间戳的后缀。请问怎么去掉后缀的时间戳啊???
回复

使用道具 举报

墨小黑 发表于 2017-5-5 17:21:10
求大神指导,SpooldirSources,fileChannels,HDFSSink,配置如下:
agent1.sources = spooldirSource
agent1.channels = fileChannel
agent1.sinks = hdfsSink

#配置sources,即被监听的源目录
agent1.sources.spooldirSource.type = spooldir
agent1.sources.spooldirSource.spoolDir = /tmp/testflumesource
agent1.sources.spooldirSource.channels = fileChannel
agent1.sources.spooldirSource.basenameHeader = true
agent1.sources.spooldirSource.basenameHeaderKey = basename
agent1.sources.spooldirSource.deletePolicy = immediate
# agent1.sources.spooldirSource.batchSize = 1000
#数据源编码格式
agent1.sources.spooldirSource.inputCharset = gbk
agent1.sources.spooldirSOurce.ignorePattern = ^(.)*\\.tmp$

#配置sinks,即目的目录
agent1.sinks.hdfsSink.type = hdfs
agent1.sinks.hdfsSink.hdfs.path = hdfs://node1.hde.h3c.com:8020/user/flume/%{topic}/%Y%m%d/%H%M
agent1.sinks.hdfsSink.fileType = DataStream
agent1.sinks.hdfsSink.writeFormat = Text
agent1.sinks.hdfsSink.hdfs.filePrefix = %{basename}
agent1.sinks.hdfsSink.hdfs.useLocalTimeStamp = true

# Number of seconds to wait before rolling current file (0 = never roll based on time interval)
agent1.sinks.hdfsSink.hdfs.rollInterval = 1
# File size to trigger roll, in bytes (0: never roll based on file size)
agent1.sinks.hdfsSink.hdfs.rollSize = 12800000
agent1.sinks.hdfsSink.hdfs.rollCount = 0
agent1.sinks.hdfsSink.hdfs.batchSize = 1000
agent1.sinks.hdfsSink.channel = fileChannel

#channels,通道目录配置:把文件事件持久化到本地硬盘上
agent1.channels.fileChannel.type = file
agent1.channels.fileChannel.checkpointDir = /home/hadoop/apache-flume-1.6.0-bin/checkpoint
agent1.channels.fileChannel.dataDirs = /home/hadoop/apache-flume-1.6.0-bin/dataDir
agent1.channels.fileChannel.capacity = 2000000
agent1.channels.fileChannel.transactionCapacity = 2000
agent1.channels.fileChannel.keep-alive = 6

启动flume后,将大量文件copy到spoolDir下,只传了几个文件后就被打断了,然后我重启flume,会继续接着传输,但是传输大概100多个文件后又被打断了,再重启,flume就报错了。
尝试的方案:先将文件copy到spooldir,然后启动flume,大概150个文件后flume就被打断,再重启后还是会打断。
请问怎么处理啊!!!
回复

使用道具 举报

Yaphets 发表于 2017-6-6 16:10:20
墨小黑 发表于 2017-5-5 17:21
求大神指导,SpooldirSources,fileChannels,HDFSSink,配置如下:
agent1.sources = spooldirSource
ag ...

你这边问题解决了吗?
回复

使用道具 举报

墨小黑 发表于 2017-6-9 18:56:19
Yaphets 发表于 2017-6-6 16:10
你这边问题解决了吗?

已解决。
方案是从一个英文论坛找到的。
大意是:将文件复制到spooldir下,会造成agent在读,复制操作在写入,导致报错。所以解决方案就是,先复制到另一个文件夹下,再剪切(linux下的mv)到spooldir下,这样就不会造成我的那种情况了。

希望对你有帮助。
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条