本帖最后由 arsenduan 于 2017-3-29 19:47 编辑
org.apache.flume.sink.hdfs.bucketwriter.close failed to close() hdfswrite for file
org.apache.flume.sink.hdfs.bucketwriter.close 390 failed to rename() file
我们来看,close方法:
[mw_shl_code=java,true] public synchronized void close(boolean callCloseCallback)
throws IOException, InterruptedException {
checkAndThrowInterruptedException();
try {
// close的时候先执行flush方法,清空batchCount,并调用HDFSWriter的sync方法
flush();
} catch (IOException e) {
LOG.warn("pre-close flush failed", e);
}
boolean failedToClose = false;
LOG.info("Closing {}", bucketPath);
// 创建一个关闭线程,这个线程会调用HDFSWriter的close方法
CallRunner<Void> closeCallRunner = createCloseCallRunner();
if (isOpen) { // 如果文件还开着
try {
// 执行HDFSWriter的close方法
callWithTimeout(closeCallRunner);
sinkCounter.incrementConnectionClosedCount();
} catch (IOException e) {
LOG.warn(
"failed to close() HDFSWriter for file (" + bucketPath +
"). Exception follows.", e);
sinkCounter.incrementConnectionFailedCount();
failedToClose = true;
// 关闭文件失败的话起个线程,retryInterval秒后继续执行
final Callable<Void> scheduledClose =
createScheduledCloseCallable(closeCallRunner);
timedRollerPool.schedule(scheduledClose, retryInterval,
TimeUnit.SECONDS);
}
isOpen = false;
} else {
LOG.info("HDFSWriter is already closed: {}", bucketPath);
}
// timedRollFuture就是根据hdfs.rollInterval配置生成的一个属性。如果hdfs.rollInterval配置为0,那么不会执行以下代码
// 因为要close文件,所以如果开启了hdfs.rollInterval等待时间到了flush文件,由于文件已经关闭,再次关闭会有问题
// 所以这里取消timedRollFuture线程的执行
if (timedRollFuture != null && !timedRollFuture.isDone()) {
timedRollFuture.cancel(false); // do not cancel myself if running!
timedRollFuture = null;
}
// 没有配置hdfs.idleTimeout, 不会执行
if (idleFuture != null && !idleFuture.isDone()) {
idleFuture.cancel(false); // do not cancel myself if running!
idleFuture = null;
}
// 重命名文件,如果报错了,不会重命名文件
if (bucketPath != null && fileSystem != null && !failedToClose) {
// 将 /data/2015/07/20/15/flume.1437375933234.txt.tmp 重命名为 /data/2015/07/20/15/flume.1437375933234.txt
renameBucket(bucketPath, targetPath, fileSystem);
}
if (callCloseCallback) { // callCloseCallback是close方法的参数
// 调用关闭文件的回调函数,也就是BucketWriter的onCloseCallback属性
// 这个onCloseCallback属性就是在HDFSEventSink里的回调函数closeCallback。 用来处理sfWriters.remove(bucketPath);
// 如果onCloseCallback属性为true,那么说明这个BucketWriter已经不会再次open新的文件了。生命周期已经到了。
// onCloseCallback只有在append方法中调用shouldRotate方法的时候需要close文件的时候才会传入false,其他情况都是true
runCloseAction();
closed = true;
}
}[/mw_shl_code]
上面是关闭方法,及重命名还有超时
// 没有配置hdfs.idleTimeout, 不会执行
if (idleFuture != null && !idleFuture.isDone()) {
idleFuture.cancel(false); // do not cancel myself if running!
idleFuture = null;
}
// 重命名文件,如果报错了,不会重命名文件
if (bucketPath != null && fileSystem != null && !failedToClose) {
// 将 /data/2015/07/20/15/flume.1437375933234.txt.tmp 重命名为 /data/2015/07/20/15/flume.1437375933234.txt
renameBucket(bucketPath, targetPath, fileSystem);
}
要么是超时的问题,要么是close的问题。其实楼主已经设置的超时时间不短了。
可是滚动的size太大了。
[mw_shl_code=bash,true]device_online_event.sinks.sinkHdfs.hdfs.rollSize = 125000000
[/mw_shl_code]
建议减小。当然也可以修改源码,失败的可以重试。
|