分享

Flume之FileChannel源码详解


问题导读


1.FileChannel设置的目录和程序日志文件保存的目录设成不同的磁盘,能达到什么效果?
2.本文中FileChannel包含哪些类?
3.FileChannel中内部事务类、文件操作类你认为完成哪些功能?






FileChannel在Flume是一个非常重要的Channel,FileChannel可以很好的保证数据的完整性和一致性,提供了类似mysql binlog的机制,保证机器down机,JVM异常退出时数据不丢失,在采集数据量很大的情况下,建议FileChannel设置的目录和程序日志文件保存的目录设成不同的磁盘,以便提高效率。

FileChannel的简易类结构:

1.png

FileChannel的内部事务类,FileBackedTransaction:

2.png

文件操作类:LogFile(LogFileV2在1.7已经被舍弃):

3.png

还有其他几个比较重要的类:
FlumeEventQueue,LogFile,Log,LogUtils。

一,初始化过程:public void configure(Context context)

1,useDualCheckpoints(是否需要备份检查点)
2,compressBackupCheckpoint(是否压缩备份节点)
3,checkpointDir(检查点目录,默认在${user.home}目录下)
4,dataDirs(数据节点目录)
5,capacity(获取配置的容量)
6,keepAlive(超时时间,就是如果channel中没有数据最长等待时间)
7,transactionCapacity(事务的最大容量)
注意:capacity的值一定要大于transactionCapacity,不然会报错,看源码:

[mw_shl_code=bash,true]Preconditions.checkState(transactionCapacity <= capacity,
      "File Channel transaction capacity cannot be greater than the " +
        "capacity of the channel.");[/mw_shl_code]

8,checkpointInterval(log的检查间隔)
9,maxFileSize(最大文件的大小,默认是1.5G)
10,minimumRequiredSpace(最少需要多少空间,默认是500M)
11,useLogReplayV1(使用旧重放逻辑)

12,useFastReplay(不使用队列重放)
13,keyProvider(KEY供应商的类型,支持的类型:JCEKSFILE)
14,activeKey(用于加密新数据的密钥名称)
15,cipherProvider(加密提供程序类型,支持的类型:AESCTRNOPADDING)

二,start()方法:



[mw_shl_code=java,true]@Override
  public synchronized void start() {
    LOG.info("Starting {}...", this);
    try {
      Builder builder = new Log.Builder();
      builder.setCheckpointInterval(checkpointInterval);
      builder.setMaxFileSize(maxFileSize);
      builder.setMinimumRequiredSpace(minimumRequiredSpace);
      builder.setQueueSize(capacity);
      builder.setCheckpointDir(checkpointDir);
      builder.setLogDirs(dataDirs);
      builder.setChannelName(getName());
      builder.setUseLogReplayV1(useLogReplayV1);
      builder.setUseFastReplay(useFastReplay);
      builder.setEncryptionKeyProvider(encryptionKeyProvider);
      builder.setEncryptionKeyAlias(encryptionActiveKey);
      builder.setEncryptionCipherProvider(encryptionCipherProvider);
      builder.setUseDualCheckpoints(useDualCheckpoints);
      builder.setCompressBackupCheckpoint(compressBackupCheckpoint);
      builder.setBackupCheckpointDir(backupCheckpointDir);
      builder.setFsyncPerTransaction(fsyncPerTransaction);
      builder.setFsyncInterval(fsyncInterval);
      builder.setCheckpointOnClose(checkpointOnClose);//以上是将configure方法获取到的参数,set到Builder对象
      log = builder.build();
      //builder.build();方法通过Builder创建Log对象
      //并且尝试获取checkpointDir和dataDir文件锁,Log类中的private void lock(File dir) throws IOException方法就是用来尝试过去锁的
      log.replay();
      //1,首先获取到checkpointDir的写锁
      //2,获取最大的fileID
      //3,读取log文件根据record的类型进行相应的操作,进行恢复;遍历所有的data目录
      //4,将queue刷新到相关文件
      open = true;//表示打开channel
      int depth = getDepth();
      
      Preconditions.checkState(queueRemaining.tryAcquire(depth),
          "Unable to acquire " + depth + " permits " + channelNameDescriptor);
      LOG.info("Queue Size after replay: " + depth + " "
           + channelNameDescriptor);
    } catch (Throwable t) {
      open = false;
      startupError = t;
      LOG.error("Failed to start the file channel " + channelNameDescriptor, t);
      if (t instanceof Error) {
        throw (Error) t;
      }
    }
    if (open) {
    //计数器开始统计
      channelCounter.start();
      channelCounter.setChannelSize(getDepth());
      channelCounter.setChannelCapacity(capacity);
    }
    super.start();
  }[/mw_shl_code]

org.apache.flume.channel.file.Log类用来将Event写入磁盘并将指向这些event的指针存入一个内存队列FlumeEventQueue中。并且启动一个线程,每过checkpointInterval毫秒写一次检查点log.writeCheckpoint()。
[mw_shl_code=java,true]workerExecutor.scheduleWithFixedDelay(new BackgroundWorker(this),
        this.checkpointInterval, this.checkpointInterval,
        TimeUnit.MILLISECONDS);[/mw_shl_code]

[mw_shl_code=java,true]  static class BackgroundWorker implements Runnable {
    private static final Logger LOG = LoggerFactory
        .getLogger(BackgroundWorker.class);
    private final Log log;

    public BackgroundWorker(Log log) {
      this.log = log;
    }

    @Override
    public void run() {
      try {
        if (log.open) {
          log.writeCheckpoint();
          //将checpoint、inflightTakes、inflightPuts都刷新至磁盘,先后将inflightPuts、inflightTakes、checkpoint.meta重建,
          //更新checkpoint文件并刷新至磁盘,这些文件都在checkpointDir目录下;更新log-ID.meta文件;同时肩负起删除log文件及其对应的meta文件的责任。
        }
      } catch (IOException e) {
        LOG.error("Error doing checkpoint", e);
      } catch (Throwable e) {
        LOG.error("General error in checkpoint worker", e);
      }
    }
  }[/mw_shl_code]

三,事务
很多方法和Memory的事务类相似。如:doTake(),doCommit(),doRollback(),doPut()

下面详细的介绍这几个方法。
1,doPut():source会调用put方法

[mw_shl_code=java,true]@Override
    protected void doPut(Event event) throws InterruptedException {
      channelCounter.incrementEventPutAttemptCount();
      if(putList.remainingCapacity() == 0) {//是否有剩余空间
        throw new ChannelException("Put queue for FileBackedTransaction " +
            "of capacity " + putList.size() + " full, consider " +
            "committing more frequently, increasing capacity or " +
            "increasing thread count. " + channelNameDescriptor);
      }
      // this does not need to be in the critical section as it does not
      // modify the structure of the log or queue.
      if(!queueRemaining.tryAcquire(keepAlive, TimeUnit.SECONDS)) {//尝试等待
        throw new ChannelFullException("The channel has reached it's capacity. "
            + "This might be the result of a sink on the channel having too "
            + "low of batch size, a downstream system running slower than "
            + "normal, or that the channel capacity is just too low. "
            + channelNameDescriptor);
      }
      boolean success = false;
      log.lockShared();//获取checkpoint的读锁,doTake()方法也会获取读锁,所以doTake和doPut只能操作一个,无法同时操作。
      try {
       //transactionID是在TransactionIDOracle类中递增的
        FlumeEventPointer ptr = log.put(transactionID, event);//将Event写入数据文件,使用RandomAccessFile。数据会缓存到inflightputs文件中
        Preconditions.checkState(putList.offer(ptr), "putList offer failed "
          + channelNameDescriptor);
        queue.addWithoutCommit(ptr, transactionID);//指针和事务ID加入到queue队列中。
        success = true;
      } catch (IOException e) {
        throw new ChannelException("Put failed due to IO error "
                + channelNameDescriptor, e);
      } finally {
        log.unlockShared();//释放读锁
        if(!success) {
          // release slot obtained in the case
          // the put fails for any reason
          queueRemaining.release();//释放信号量
        }
      }
    }[/mw_shl_code]

2,doTake():sink会调用put方法
[mw_shl_code=java,true] protected Event doTake() throws InterruptedException {  
      channelCounter.incrementEventTakeAttemptCount();  
      if(takeList.remainingCapacity() == 0) {  
        throw new ChannelException("Take list for FileBackedTransaction, capacity " +  
            takeList.size() + " full, consider committing more frequently, " +  
            "increasing capacity, or increasing thread count. "  
               + channelNameDescriptor);  
      }  
      log.lockShared();//获取锁  
      /*
       * 1. Take an event which is in the queue.
       * 2. If getting that event does not throw NoopRecordException,
       *    then return it.
       * 3. Else try to retrieve the next event from the queue
       * 4. Repeat 2 and 3 until queue is empty or an event is returned.
       */  
  
      try {  
        while (true) {  
          FlumeEventPointer ptr = queue.removeHead(transactionID);//获取文件指针,ptr的数据结构是fileID和offset  
          if (ptr == null) {  
            return null;  
          } else {  
            try {  
              // first add to takeList so that if write to disk  
              // fails rollback actually does it's work  
              Preconditions.checkState(takeList.offer(ptr),  
                "takeList offer failed "  
                  + channelNameDescriptor);  
              log.take(transactionID, ptr); // write take to disk  
              Event event = log.get(ptr);//根据文件指针,使用log对象在磁盘中获取到Event。数据会缓存到inflighttakes文件中  
              return event;  
            } catch (IOException e) {  
              throw new ChannelException("Take failed due to IO error "  
                + channelNameDescriptor, e);  
            } catch (NoopRecordException e) {  
              LOG.warn("Corrupt record replaced by File Channel Integrity " +  
                "tool found. Will retrieve next event", e);  
              takeList.remove(ptr);  
            } catch (CorruptEventException ex) {  
              if (fsyncPerTransaction) {  
                throw new ChannelException(ex);  
              }  
              LOG.warn("Corrupt record found. Event will be " +  
                "skipped, and next event will be read.", ex);  
              takeList.remove(ptr);  
            }  
          }  
        }  
      } finally {  
        log.unlockShared();//释放锁  
      }  
    }  [/mw_shl_code]

3,doCommit():source和sink都会调用该方法提交事务

[mw_shl_code=java,true]    @Override
    protected void doCommit() throws InterruptedException {
      int puts = putList.size();
      int takes = takeList.size();
      if(puts > 0) {//puts和takes不能同时都>0,其中有一个得是等于零
        Preconditions.checkState(takes == 0, "nonzero puts and takes "
                + channelNameDescriptor);
        log.lockShared();//获取锁
        try {
          log.commitPut(transactionID);//该操作会封装成一个ByteBuffer类型写入到文件,
          channelCounter.addToEventPutSuccessCount(puts);
          synchronized (queue) {
            while(!putList.isEmpty()) {
              if(!queue.addTail(putList.removeFirst())) {
                StringBuilder msg = new StringBuilder();
                msg.append("Queue add failed, this shouldn't be able to ");
                msg.append("happen. A portion of the transaction has been ");
                msg.append("added to the queue but the remaining portion ");
                msg.append("cannot be added. Those messages will be consumed ");
                msg.append("despite this transaction failing. Please report.");
                msg.append(channelNameDescriptor);
                LOG.error(msg.toString());
                Preconditions.checkState(false, msg.toString());
              }
            }
            queue.completeTransaction(transactionID);//清空checkpoint文件夹中inflightputs和inflighttakes文件的内容
          }
        } catch (IOException e) {
          throw new ChannelException("Commit failed due to IO error "
                  + channelNameDescriptor, e);
        } finally {
          log.unlockShared();//释放锁
        }

      } else if (takes > 0) {
        log.lockShared();//释放锁
        try {
          log.commitTake(transactionID);//写入data文件
          queue.completeTransaction(transactionID);//和上面操作一样
          channelCounter.addToEventTakeSuccessCount(takes);
        } catch (IOException e) {
          throw new ChannelException("Commit failed due to IO error "
              + channelNameDescriptor, e);
        } finally {
          log.unlockShared();
        }
        queueRemaining.release(takes);
      }
      putList.clear();
      takeList.clear();//清空两个队列
      channelCounter.setChannelSize(queue.getSize());
    }[/mw_shl_code]

4,doRollback():source和sink都会调用该方法回滚数据

[mw_shl_code=java,true]    @Override
    protected void doRollback() throws InterruptedException {
      int puts = putList.size();
      int takes = takeList.size();
      log.lockShared();
      try {
        if(takes > 0) {
          Preconditions.checkState(puts == 0, "nonzero puts and takes "
              + channelNameDescriptor);
          synchronized (queue) {
            while (!takeList.isEmpty()) {
              Preconditions.checkState(queue.addHead(takeList.removeLast()),
                  "Queue add failed, this shouldn't be able to happen "
                      + channelNameDescriptor);
            }
          }
        }
        putList.clear();
        takeList.clear();
        queue.completeTransaction(transactionID);
        channelCounter.setChannelSize(queue.getSize());
        log.rollback(transactionID);//也是封装成ByteBuffer,写入到缓存文件中。
      } catch (IOException e) {
        throw new ChannelException("Commit failed due to IO error "
            + channelNameDescriptor, e);
      } finally {
        log.unlockShared();
        // since rollback is being called, puts will never make it on
        // to the queue and we need to be sure to release the resources
        queueRemaining.release(puts);
      }
    }[/mw_shl_code]


Flame的FileChannel在系统崩溃的时候保证数据的完整性和一致性,其实是通过JDK的字节通道实现的(java.nio.channels.FileChannel),字节通道为了保证数据在系统崩溃之后不丢失数据,文件的修改模式会被强制到底层存储设备。

最后看下Flume FileChannel的文件结构:
checkpoint目录:

4.png
checkpoint:存放Event在那个data文件logFileID的什么位置offset等信息。
inflighttakes:存放的是事务take的缓存数据,每隔段时间就重建文件。
内容:
1、16字节是校验码;
2、transactionID1+eventsCount1+eventPointer11+eventPointer12+...;
3、transactionID2+eventsCount2+eventPointer21+eventPointer22+...
inflightputs:存放的是事务对应的put缓存数据,每隔段时间就重建文件。
内容:
1、16字节是校验码;
2、transactionID1+eventsCount1+eventPointer11+eventPointer12+...;
3、transactionID2+eventsCount2+eventPointer21+eventPointer22+...
checkpoint.meta:主要存储的是logfileID及对应event的数量等信息。
data目录:

5.png
log-ID.meta:主要记录log-ID下一个写入位置以及logWriteOrderID等信息。
log-ID:数据文件,目录里数据文件保持不超过2个。

FileChannel实现比较复杂,先写这么多,以后有需要细细了解。



欢迎加入about云群425860289432264021 ,云计算爱好者群,关注about云腾讯认证空间

已有(5)人评论

跳转到指定楼层
zxmit 发表于 2015-8-17 12:28:52
回复

使用道具 举报

zxmit 发表于 2015-8-27 15:43:29
期待更详尽的内容
回复

使用道具 举报

为了明天time 发表于 2015-9-21 15:24:51
终于知道checkpoint目录存的什么了  
回复

使用道具 举报

为了明天time 发表于 2015-9-21 15:25:24
回复

使用道具 举报

zhangjiajie 发表于 2016-2-22 15:39:28
想问一下data的东西。
data目录:


log-ID.meta:主要记录log-ID下一个写入位置以及logWriteOrderID等信息。
log-ID:数据文件,目录里数据文件保持不超过2个。

FileChannel实现比较复杂,先写这么多,以后有需要细细了解。

我的flume里面这个下面的文件还是比较大的,这里面存的是什么,可以说的更详细吗,可以删除吗
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条