立即注册 登录
About云-梭伦科技 返回首页

qcbb001的个人空间 https://aboutyun.com/?1399 [收藏] [复制] [分享] [RSS]

日志

Flume1.6版本的Spooling Directory Source支持Sub-directories【子目录】

已有 2306 次阅读2017-3-24 14:31

现实需求

Spooling Directory Source支持的feature官方介绍:
https://flume.apache.org/FlumeUserGuide.html#spooling-directory-source

使用基本的Spooling Directory Source配置如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# ---------------------------------------
# Flume agent config
agent.sources = spool_source
agent.channels = memory_channel
agent.sinks = local_sink

# ---------------------------------------
# source1: spool_source -----------------
agent.sources.spool_source.type = spooldir
agent.sources.spool_source.channels = memory_channel
agent.sources.spool_source.spoolDir = /home/urey/spool_dir
agent.sources.spool_source.fileHeader = true

# ---------------------------------------
# sink1: local_sink ---------------------
agent.sinks.local_sink.type = file_roll
agent.sinks.local_sink.sink.directory = /home/urey/local_sink_dir
agent.sinks.local_sink.sink.rollInterval = 0
agent.sinks.local_sink.channel = memory_channel

# ---------------------------------------
# channel1: memory_channel --------------
agent.channels.memory_channel.type = memory
agent.channels.memory_channel.capacity = 10000
agent.channels.memory_channel.transactionCapacity = 10000
agent.channels.memory_channel.keep-alive = 20

实际的环境可能需要优化原生的Spooling Directory Source,使之支持:
1. 可以递归地对配置目录的所有子目录的所有文件进行监听;
2. 可以对日志的error,或者stack trace进行特殊的处理,使之生成一个flume event,而不是每一行生成一个flume event;

本文先解决第一个需求,递归地处理配置目录。


原生的SpoolDirectorySource实现逻辑

先简单介绍下处理逻辑:

第一个涉及到的类是:SpoolDirectorySource

它会启动线程对source处理,其中,它里面包含有ReliableSpoolingFileEventReader(下面的reader)用来将source变成一个个flume event;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
@Override
public void run() {
int backoffInterval = 250;
try {
while (!Thread.interrupted()) {
List<Event> events = reader.readEvents(batchSize);
if (events.isEmpty()) {

break;
}

sourceCounter.addToEventReceivedCount(events.size());
sourceCounter.incrementAppendBatchReceivedCount();


try {
getChannelProcessor().processEventBatch(events);
reader.commit();
} catch (ChannelException ex) {
logger.warn("The channel is full, and cannot write data now. The " +
"source will try again after " + String.valueOf(backoffInterval) +
" milliseconds");
hitChannelException = true;
if (backoff) {
TimeUnit.MILLISECONDS.sleep(backoffInterval);
backoffInterval = backoffInterval << 1;

backoffInterval = backoffInterval >= maxBackoff ? maxBackoff :
backoffInterval;
}

continue;
}
backoffInterval = 250;
sourceCounter.addToEventAcceptedCount(events.size());
sourceCounter.incrementAppendBatchAcceptedCount();

}
} catch (Throwable t) {
logger.error("FATAL: " + SpoolDirectorySource.this.toString() + ": " +
"Uncaught exception in SpoolDirectorySource thread. " +
"Restart or reconfigure Flume to continue processing.", t);
hasFatalError = true;
Throwables.propagate(t);
}
}



第二个涉及到的类就是ReliableSpoolingFileEventReader,在构造它的时候会进行如下操作:

  1. 健康检查;比如检测待监听的目录是否可读写:需要可读是因为要具有浏览目录的权限,需要可写是因为要具有新建,删除,修改,移动目录内文件的权限(比如对处理完的文件进行特殊标记,不再监听,比如test.log变成test.log.COMPLETED)
  2. 在监听目录生成flume处理的元数据文件:.flumespool

它最重要的方法是readEvents,这里面会通过EventDeserializer反序列化器将每一行内容变成一条flume event(这里假设EventDeserializer的实现类是LineDeserializer,处理的是文本文件)

在LineDeserializer类中会涉及到具体的如何将一行内容转化成一个flume event的逻辑,在处理错误日志和堆栈信息日志的时候会重点介绍这个类。

小结一下:SpoolDirectorySource会拿到监听目录下面满足条件的所有文件,按相应的策略从这些文件中找到一个文件采集日志。

好,到此为止,处理框架我们已经清晰,下面看下为什么原生的Spooling Directory Source不支持递归地监听子目录文件的变化。

在选择当前要处理哪个文件时,策略有三种:

1
2
3
4
/** Consume order. */
public enum ConsumeOrder {
OLDEST, YOUNGEST, RANDOM
}

  1. ConsumeOrder.RANDOM:随机找一个文件,完会读取后加上标志,下次会过滤掉,不会再读;
  2. ConsumeOrder.YOUNGEST:找一个最新的文件,完会读取后会加上标志,下次会过滤掉,不会再读;
  3. ConsumeOrder.OLDEST:找一个最旧的文件,完会读取后会加上标志,下次会过滤掉,不会再读;

但是,源码在查找所有的文件这块做了限制(过滤),会过滤掉下面4种类型的’文件’:

  1. 目录;
  2. 以.COMPLETED命名结尾的文件;
  3. 以.命名开头的文件;
  4. 被ignorePattern正则表达式匹配的文件(配置文件可配置);
1
2
3
4
5
6
7
8
9
10
11
12
13
/* Filter to exclude finished or hidden files */
FileFilter filter = new FileFilter() {
public boolean accept(File candidate) {
String fileName = candidate.getName();
if ((candidate.isDirectory()) ||
(fileName.endsWith(completedSuffix)) ||
(fileName.startsWith(".")) ||
ignorePattern.matcher(fileName).matches()) {
return false;
}
return true;
}
};

可以看出,这里过滤掉了所有的目录。


更改

羊毛出在羊身上,因此,这里我们找到了入手点,开始行动:

更改原来的过滤器,对目录返回true:

1
2
//candidateFiles = Arrays.asList(spoolDirectory.listFiles(filter));
candidateFiles = getCandidateFiles(spoolDirectory);

递归地处理每一个目录,将每一个目录下面的文件全部加入候选集中,再从候选集中按相应的策略选择出一个待读取的文件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/**
* Recursively gather candidate files
* @param directory the directory to gather files from
* @return list of files within the passed in directory
*/

private List<File> getCandidateFiles(File directory){
List<File> candidateFiles = new ArrayList<File>();
if (directory==null || ! directory.isDirectory()){
return candidateFiles;
}

for(File file : directory.listFiles(filter)){
if (file.isDirectory()) {
candidateFiles.addAll(getCandidateFiles(file));
}
else {
candidateFiles.add(file);
}
}

return candidateFiles;
}

之后,将这一行为变成配置的,默认不支持递归处理,如需该特性,在配置文件中加入:

1
agent.sources.spool_source.fileHeader = true


源码

完整的代码已经更新到了github上,如有需要,欢迎试用:

https://github.com/qwurey/flume-spool-recursive-directory-source


来自:

http://qiaowei.xyz/


路过

雷人

握手

鲜花

鸡蛋

评论 (0 个评论)

facelist doodle 涂鸦板

您需要登录后才可以评论 登录 | 立即注册

关闭

推荐上一条 /2 下一条