分享

flume 两个sink出现问题

Sno 发表于 2017-3-29 18:45:32 [显示全部楼层] 只看大图 回帖奖励 阅读模式 关闭右栏 2 13943
一个spooldir source, 两个file channel,一个hbase sink, 一个hdfs sink,在ctrl + c (kill 命令)时报异常
QQ截图20170329184301.png

但是我使用一个spooldir source, 一个file channel,一个hdfs sink不会出现异常。

配置文件如下:
device_online_event.sources = srcFile

device_online_event.sinks = sinkHBase sinkHdfs

device_online_event.channels = chHBase chHdfs

# Describe/configure the source
device_online_event.sources.srcFile.type = spooldir
device_online_event.sources.srcFile.channels = chHBase chHdfs
device_online_event.sources.srcFile.spoolDir = /opt/DDMLogerSave/online_event/
device_online_event.sources.srcFile.fileSuffix = .COMPLETED
device_online_event.sources.srcFile.deletePolicy = immediate
device_online_event.sources.srcFile.batchSize = 100
device_online_event.sources.srcFile.inputCharset = UTF-8
device_online_event.sources.srcFile.ignorePattern = ^(.)*\\.tmp$
device_online_event.sources.srcFile.selector.type = replicating


#hbase===========================================================================================
device_online_event.channels.chHBase.type = file
device_online_event.channels.chHBase.dataDirs = /opt/flume_data/device/online_event_hbase
device_online_event.channels.chHBase.checkpointDir= /opt/cp_flume_data/device/online_event_hbase
device_online_event.channels.chHBase.transactionCapacity = 100
device_online_event.channels.chHBase.capacity = 200000


# Describe the hbase sink
device_online_event.sinks.sinkHBase.channel = chHBase
device_online_event.sinks.sinkHBase.type = hbase
device_online_event.sinks.sinkHBase.znodeParent = /hbase
device_online_event.sinks.sinkHBase.batchSize = 100
device_online_event.sinks.sinkHBase.table = device:online_event
device_online_event.sinks.sinkHBase.columnFamily = o
device_online_event.sinks.sinkHBase.serializer = com.dandanman.flume.sink.DeviceOnlineEventHBaseSerializer
device_online_event.sinks.sinkHBase.zookeeperQuorum = hdfs2:2181,hdfs4:2181:hdfs8:2181

#hdfs===========================================================================================
# Describe file channel
device_online_event.channels.chHdfs.type = file
device_online_event.channels.chHdfs.dataDirs = /opt/flume_data/device/online_event_hdfs
device_online_event.channels.chHdfs.checkpointDir= /opt/cp_flume_data/device/online_event_hdfs
device_online_event.channels.chHdfs.transactionCapacity = 100
device_online_event.channels.chHdfs.capacity = 200000

# Describe the hdfs sink
device_online_event.sinks.sinkHdfs.channel = chHdfs
device_online_event.sinks.sinkHdfs.type = hdfs
device_online_event.sinks.sinkHdfs.hdfs.path = hdfs://ns/logs/device/online_event/%Y%m%d
device_online_event.sinks.sinkHdfs.hdfs.writeFormat = Text
device_online_event.sinks.sinkHdfs.hdfs.minBlockReplicas = 1
device_online_event.sinks.sinkHdfs.hdfs.fileType = DataStream

device_online_event.sinks.sinkHdfs.hdfs.rollInterval = 0
device_online_event.sinks.sinkHdfs.hdfs.rollSize = 125000000
device_online_event.sinks.sinkHdfs.hdfs.rollCount = 0
device_online_event.sinks.sinkHdfs.hdfs.idleTimeout = 0
device_online_event.sinks.sinkHdfs.hdfs.useLocalTimeStamp = true
device_online_event.sinks.sinkHdfs.hdfs.batchSize = 100
device_online_event.sinks.sinkHdfs.hdfs.threadsPoolSize = 10
device_online_event.sinks.sinkHdfs.hdfs.callTimeout = 600000
#device_online_event.sinks.sinkHdfs.hdfs.round = true
#device_online_event.sinks.sinkHdfs.hdfs.roundUnit = minute
#device_online_event.sinks.sinkHdfs.hdfs.roundValue = 1



有谁遇到过这个问题吗?求解

已有(2)人评论

跳转到指定楼层
arsenduan 发表于 2017-3-29 19:45:53
本帖最后由 arsenduan 于 2017-3-29 19:47 编辑

org.apache.flume.sink.hdfs.bucketwriter.close failed to close() hdfswrite for file
org.apache.flume.sink.hdfs.bucketwriter.close 390 failed to rename() file

我们来看,close方法:
[mw_shl_code=java,true] public synchronized void close(boolean callCloseCallback)
        throws IOException, InterruptedException {
    checkAndThrowInterruptedException();
    try {
      // close的时候先执行flush方法,清空batchCount,并调用HDFSWriter的sync方法
      flush();
    } catch (IOException e) {
      LOG.warn("pre-close flush failed", e);
    }
    boolean failedToClose = false;
    LOG.info("Closing {}", bucketPath);
    // 创建一个关闭线程,这个线程会调用HDFSWriter的close方法
    CallRunner<Void> closeCallRunner = createCloseCallRunner();
    if (isOpen) { // 如果文件还开着
      try {
          // 执行HDFSWriter的close方法
        callWithTimeout(closeCallRunner);
        sinkCounter.incrementConnectionClosedCount();
      } catch (IOException e) {
        LOG.warn(
          "failed to close() HDFSWriter for file (" + bucketPath +
            "). Exception follows.", e);
        sinkCounter.incrementConnectionFailedCount();
        failedToClose = true;
        // 关闭文件失败的话起个线程,retryInterval秒后继续执行
        final Callable<Void> scheduledClose =
          createScheduledCloseCallable(closeCallRunner);
        timedRollerPool.schedule(scheduledClose, retryInterval,
          TimeUnit.SECONDS);
      }
      isOpen = false;
    } else {
      LOG.info("HDFSWriter is already closed: {}", bucketPath);
    }

    // timedRollFuture就是根据hdfs.rollInterval配置生成的一个属性。如果hdfs.rollInterval配置为0,那么不会执行以下代码
    // 因为要close文件,所以如果开启了hdfs.rollInterval等待时间到了flush文件,由于文件已经关闭,再次关闭会有问题
    // 所以这里取消timedRollFuture线程的执行
    if (timedRollFuture != null && !timedRollFuture.isDone()) {
      timedRollFuture.cancel(false); // do not cancel myself if running!
      timedRollFuture = null;
    }

    // 没有配置hdfs.idleTimeout, 不会执行
    if (idleFuture != null && !idleFuture.isDone()) {
      idleFuture.cancel(false); // do not cancel myself if running!
      idleFuture = null;
    }

    // 重命名文件,如果报错了,不会重命名文件
    if (bucketPath != null && fileSystem != null && !failedToClose) {
      // 将 /data/2015/07/20/15/flume.1437375933234.txt.tmp 重命名为 /data/2015/07/20/15/flume.1437375933234.txt
      renameBucket(bucketPath, targetPath, fileSystem);
    }
    if (callCloseCallback) { // callCloseCallback是close方法的参数

      // 调用关闭文件的回调函数,也就是BucketWriter的onCloseCallback属性
      // 这个onCloseCallback属性就是在HDFSEventSink里的回调函数closeCallback。 用来处理sfWriters.remove(bucketPath);
      // 如果onCloseCallback属性为true,那么说明这个BucketWriter已经不会再次open新的文件了。生命周期已经到了。
      // onCloseCallback只有在append方法中调用shouldRotate方法的时候需要close文件的时候才会传入false,其他情况都是true
      runCloseAction();

      closed = true;
    }
}[/mw_shl_code]

上面是关闭方法,及重命名还有超时

// 没有配置hdfs.idleTimeout, 不会执行
if (idleFuture != null && !idleFuture.isDone()) {
idleFuture.cancel(false); // do not cancel myself if running!
idleFuture = null;
}
// 重命名文件,如果报错了,不会重命名文件
if (bucketPath != null && fileSystem != null && !failedToClose) {
// 将 /data/2015/07/20/15/flume.1437375933234.txt.tmp 重命名为 /data/2015/07/20/15/flume.1437375933234.txt
renameBucket(bucketPath, targetPath, fileSystem);
}
要么是超时的问题,要么是close的问题。其实楼主已经设置的超时时间不短了。
可是滚动的size太大了。

[mw_shl_code=bash,true]device_online_event.sinks.sinkHdfs.hdfs.rollSize = 125000000
[/mw_shl_code]
建议减小。当然也可以修改源码,失败的可以重试。



回复

使用道具 举报

arsenduan 发表于 2017-3-29 19:48:21
可以通过[mw_shl_code=bash,true]hadoop fsck -openforwrite
[/mw_shl_code]
命令查看发现有文件没有关闭。


回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条