分享

Flume-ng FileChannel原理解析

xioaxu790 发表于 2014-8-10 15:08:43 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 0 9037
问题导读
1、什么是FileChannel?
2、如何实现FileChannel?
3、当实例crash时,通过l什么来恢复queue的状态?





对于Flume来说主要有两个Channel:Memory,File;对于线上环境主要以FileChannel为主,因此这里主要讨论它的实现:
在FileChannel里主要由一个WAL的log和一个内存队列组成:
FileChannel的Queue主要又以下几个部分组成:
  1. private final EventQueueBackingStore backingStore;
  2. private final InflightEventWrapper inflightTakes;
  3. private final InflightEventWrapper inflightPuts;
复制代码


其中backingStore代表了queue在持久化存在,使用了内存映射文件的方式;每次对queue的读写操作都记录在backingStore的overwritemap(update in place)中,当进行checkpoint的时候合并到elementsBuffer并持久化到磁盘;所有未提交的正在读写数据都分别保存在inflight结构中,当checkpoint时一并进行持久化,为回滚时使用;
在inflight中存储了transactionid->fileid以及transactionid->eventptr的映射,具体存储在backingStore里的则是eventptr(fileid,offset);
Checkpoint file的文件结构如下:
  1. File Header:1029 bytes
  2. Eventptr;
复制代码


在File header里前8个字节存储了版本号,接下来24个字节是sequeuece no.(类似rdbms的scn),接下来4个字节存储了checkpoint的状态;
作为WAL的Log主要存储了(transactionid,sequenceNo,Event),每次读写都先在log里写入event,对于写操作会拿到eventptr放入queue中;而commit和rollback操作在log中的记录形式是(transactionid,sequenceNo,OP={commit,rollback});
这两个结构主要是体现在FileBackedTransaction中如下:
  1. FileBackedTransaction extends BasicTransactionSemantics
  2. ......
  3. LinkedBlockingDeque<FlumeEventPointer> takeList;
  4. LinkedBlockingDeque<FlumeEventPointer> putList;
  5. long transactionID;
  6. Log log;
  7. FlumeEventQueue queue: EventQueueBackingStoreFile
  8. 其中queue = log.getFlumeEventQueue();
复制代码


首先看put/take path以及commit:
  1. 1. doPut(Event event)->
  2. queue.addWithoutCommit(ptr, transactionID)
  3. log.put(transactionID, event)->
  4. synchronized LogFile.Writer.put(ByteBuffer buffer)
  5. putList.offer(ptr)
  6. 2. doTake() ->
  7. FlumeEventPointer ptr = queue.removeHead(transactionID);
  8. takeList.offer(ptr),
  9. log.take(transactionID, ptr); ->
  10. synchronized LogFile.Writer.take(ByteBuffer buffer)
  11. Event event = log.get(ptr);
  12. 3. doCommit()->
  13. if(puts > 0) {
  14. log.commitPut(transactionID);
  15. synchronized (queue) {
  16. while(!putList.isEmpty()) {
  17. queue.addTail(putList.removeFirst())
  18. queue.completeTransaction(transactionID);
  19. }
  20. }
  21. else if (takes > 0) {
  22. log.commitTake(transactionID);->
  23. logFileWriter.commit(buffer);
  24. logFileWriter.sync();
  25. queue.completeTransaction(transactionID);
  26. queueRemaining.release(takes);
  27. }
  28. }
复制代码


从上面的代码可以看出,对于每一个put/take都会记录一条oplog到log里,当commit的时候会对log进行sync到磁盘持久化,同时会把event指针存放到queue上;这里的log就类似于mysql里的binlog(binlog_format=statement),而这里的queue存放的是指向event的指针;
简例:FileChannel如下,对FileChannel put了2个消息,a,b;则在log,queue里的存储状态如下,Log里存储了(transactionid,sequenceNo,Event),queue则存储了eventptr;
  1. Queue:ptr->a,ptr->b
  2. WAL log:(1,1,put a),(1,2,put b),(1,3,commit)
复制代码


当实例crash时,通过log来恢复queue的状态,类似rdbms一样,replay是很耗时的操作,因此会定期对queue进行checkpoint:
Log在初始化的时候会启动一个调度线程workerExecutor,由调度线程定期(checkpoint interval)调度一个backgroupWorkder来进行非强制性
  1. checkpoint;
  2. Log.writeCheckpoint(Boolean force): tryLockExclusive->
  3. synchronized queue.checkpoint->
  4. backingStore.beginCheckpoint();//检查是否checkpoint正在进行;同时进行标记checkpoint开始,并同步MMAP file;
  5. inflightPuts.serializeAndWrite();//
  6. inflightTakes.serializeAndWrite();//将inflightputs/takes序列化并写到相应文件
  7. backingStore.checkpoint();->
  8. setLogWriteOrderID(WriteOrderOracle.next());
  9. writeCheckpointMetaData();
  10. //copy from overwriteMap to elementsBuffer(MMAP)
  11. //标记checkpoint结束,并同步文件
复制代码


简例:接上例,在a,b提交后,这时进行了一次checkpoint(存储在磁盘上的checkpoint则是2个指针ptr->a,ptr->b),此时scn=4;之后,又完成了一个take transaction ,ptr to a 也同时被删除;如果这时Flume crash,queue从checkpoint中重建,并且取得checkpoint scn=4,则replay这之后的log进行crash recovery;在恢复后,立刻执行一次checkpoint.
  1. queue:ptr->b
  2. WAL log:(1,1,put a),(1,2,put b),(1,3,commit),(2,5,take a),(2,6,commit)
  3. [@more@]
复制代码




本文转自:http://blog.itpub.net/11676357/viewspace-1060911

没找到任何评论,期待你打破沉寂

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条