Spooling Directory Source支持的feature官方介绍:
https://flume.apache.org/FlumeUserGuide.html#spooling-directory-source
使用基本的Spooling Directory Source配置如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 | # --------------------------------------- # Flume agent config agent.sources = spool_source agent.channels = memory_channel agent.sinks = local_sink # --------------------------------------- # source1: spool_source ----------------- agent.sources.spool_source.type = spooldir agent.sources.spool_source.channels = memory_channel agent.sources.spool_source.spoolDir = /home/urey/spool_dir agent.sources.spool_source.fileHeader = true # --------------------------------------- # sink1: local_sink --------------------- agent.sinks.local_sink.type = file_roll agent.sinks.local_sink.sink.directory = /home/urey/local_sink_dir agent.sinks.local_sink.sink.rollInterval = 0 agent.sinks.local_sink.channel = memory_channel # --------------------------------------- # channel1: memory_channel -------------- agent.channels.memory_channel.type = memory agent.channels.memory_channel.capacity = 10000 agent.channels.memory_channel.transactionCapacity = 10000 agent.channels.memory_channel.keep-alive = 20 |
实际的环境可能需要优化原生的Spooling Directory Source,使之支持:
1. 可以递归地对配置目录的所有子目录的所有文件进行监听;
2. 可以对日志的error,或者stack trace进行特殊的处理,使之生成一个flume event,而不是每一行生成一个flume event;
本文先解决第一个需求,递归地处理配置目录。
先简单介绍下处理逻辑:
第一个涉及到的类是:SpoolDirectorySource
它会启动线程对source处理,其中,它里面包含有ReliableSpoolingFileEventReader(下面的reader)用来将source变成一个个flume event;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 | @Override public void run() { int backoffInterval = 250; try { while (!Thread.interrupted()) { List<Event> events = reader.readEvents(batchSize); if (events.isEmpty()) { break; } sourceCounter.addToEventReceivedCount(events.size()); sourceCounter.incrementAppendBatchReceivedCount(); try { getChannelProcessor().processEventBatch(events); reader.commit(); } catch (ChannelException ex) { logger.warn("The channel is full, and cannot write data now. The " + "source will try again after " + String.valueOf(backoffInterval) + " milliseconds"); hitChannelException = true; if (backoff) { TimeUnit.MILLISECONDS.sleep(backoffInterval); backoffInterval = backoffInterval << 1; backoffInterval = backoffInterval >= maxBackoff ? maxBackoff : backoffInterval; } continue; } backoffInterval = 250; sourceCounter.addToEventAcceptedCount(events.size()); sourceCounter.incrementAppendBatchAcceptedCount(); } } catch (Throwable t) { logger.error("FATAL: " + SpoolDirectorySource.this.toString() + ": " + "Uncaught exception in SpoolDirectorySource thread. " + "Restart or reconfigure Flume to continue processing.", t); hasFatalError = true; Throwables.propagate(t); } } |
第二个涉及到的类就是ReliableSpoolingFileEventReader,在构造它的时候会进行如下操作:
它最重要的方法是readEvents,这里面会通过EventDeserializer反序列化器将每一行内容变成一条flume event(这里假设EventDeserializer的实现类是LineDeserializer,处理的是文本文件)
在LineDeserializer类中会涉及到具体的如何将一行内容转化成一个flume event的逻辑,在处理错误日志和堆栈信息日志的时候会重点介绍这个类。
小结一下:SpoolDirectorySource会拿到监听目录下面满足条件的所有文件,按相应的策略从这些文件中找到一个文件采集日志。
好,到此为止,处理框架我们已经清晰,下面看下为什么原生的Spooling Directory Source不支持递归地监听子目录文件的变化。
在选择当前要处理哪个文件时,策略有三种:
1 2 3 4 | /** Consume order. */ public enum ConsumeOrder { OLDEST, YOUNGEST, RANDOM } |
但是,源码在查找所有的文件这块做了限制(过滤),会过滤掉下面4种类型的’文件’:
1 2 3 4 5 6 7 8 9 10 11 12 13 | /* Filter to exclude finished or hidden files */ FileFilter filter = new FileFilter() { public boolean accept(File candidate) { String fileName = candidate.getName(); if ((candidate.isDirectory()) || (fileName.endsWith(completedSuffix)) || (fileName.startsWith(".")) || ignorePattern.matcher(fileName).matches()) { return false; } return true; } }; |
可以看出,这里过滤掉了所有的目录。
羊毛出在羊身上,因此,这里我们找到了入手点,开始行动:
更改原来的过滤器,对目录返回true:
1 2 | //candidateFiles = Arrays.asList(spoolDirectory.listFiles(filter)); candidateFiles = getCandidateFiles(spoolDirectory); |
递归地处理每一个目录,将每一个目录下面的文件全部加入候选集中,再从候选集中按相应的策略选择出一个待读取的文件:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 | /** * Recursively gather candidate files * @param directory the directory to gather files from * @return list of files within the passed in directory */ private List<File> getCandidateFiles(File directory){ List<File> candidateFiles = new ArrayList<File>(); if (directory==null || ! directory.isDirectory()){ return candidateFiles; } for(File file : directory.listFiles(filter)){ if (file.isDirectory()) { candidateFiles.addAll(getCandidateFiles(file)); } else { candidateFiles.add(file); } } return candidateFiles; } |
之后,将这一行为变成配置的,默认不支持递归处理,如需该特性,在配置文件中加入:
1 | agent.sources.spool_source.fileHeader = true |
完整的代码已经更新到了github上,如有需要,欢迎试用:
https://github.com/qwurey/flume-spool-recursive-directory-source
来自:
http://qiaowei.xyz/