问题导读
1.FileChannel设置的目录和程序日志文件保存的目录设成不同的磁盘,能达到什么效果?
2.本文中FileChannel包含哪些类?
3.FileChannel中内部事务类、文件操作类你认为完成哪些功能?
FileChannel在Flume是一个非常重要的Channel,FileChannel可以很好的保证数据的完整性和一致性,提供了类似mysql binlog的机制,保证机器down机,JVM异常退出时数据不丢失,在采集数据量很大的情况下,建议FileChannel设置的目录和程序日志文件保存的目录设成不同的磁盘,以便提高效率。
FileChannel的简易类结构:
FileChannel的内部事务类,FileBackedTransaction:
文件操作类:LogFile(LogFileV2在1.7已经被舍弃):
还有其他几个比较重要的类:
FlumeEventQueue,LogFile,Log,LogUtils。
一,初始化过程:public void configure(Context context)
1,useDualCheckpoints(是否需要备份检查点)
2,compressBackupCheckpoint(是否压缩备份节点)
3,checkpointDir(检查点目录,默认在${user.home}目录下)
4,dataDirs(数据节点目录)
5,capacity(获取配置的容量)
6,keepAlive(超时时间,就是如果channel中没有数据最长等待时间)
7,transactionCapacity(事务的最大容量)
注意:capacity的值一定要大于transactionCapacity,不然会报错,看源码:
[mw_shl_code=bash,true]Preconditions.checkState(transactionCapacity <= capacity,
"File Channel transaction capacity cannot be greater than the " +
"capacity of the channel.");[/mw_shl_code]
8,checkpointInterval(log的检查间隔)
9,maxFileSize(最大文件的大小,默认是1.5G)
10,minimumRequiredSpace(最少需要多少空间,默认是500M)
11,useLogReplayV1(使用旧重放逻辑)
12,useFastReplay(不使用队列重放)
13,keyProvider(KEY供应商的类型,支持的类型:JCEKSFILE)
14,activeKey(用于加密新数据的密钥名称)
15,cipherProvider(加密提供程序类型,支持的类型:AESCTRNOPADDING)
二,start()方法:
[mw_shl_code=java,true]@Override
public synchronized void start() {
LOG.info("Starting {}...", this);
try {
Builder builder = new Log.Builder();
builder.setCheckpointInterval(checkpointInterval);
builder.setMaxFileSize(maxFileSize);
builder.setMinimumRequiredSpace(minimumRequiredSpace);
builder.setQueueSize(capacity);
builder.setCheckpointDir(checkpointDir);
builder.setLogDirs(dataDirs);
builder.setChannelName(getName());
builder.setUseLogReplayV1(useLogReplayV1);
builder.setUseFastReplay(useFastReplay);
builder.setEncryptionKeyProvider(encryptionKeyProvider);
builder.setEncryptionKeyAlias(encryptionActiveKey);
builder.setEncryptionCipherProvider(encryptionCipherProvider);
builder.setUseDualCheckpoints(useDualCheckpoints);
builder.setCompressBackupCheckpoint(compressBackupCheckpoint);
builder.setBackupCheckpointDir(backupCheckpointDir);
builder.setFsyncPerTransaction(fsyncPerTransaction);
builder.setFsyncInterval(fsyncInterval);
builder.setCheckpointOnClose(checkpointOnClose);//以上是将configure方法获取到的参数,set到Builder对象
log = builder.build();
//builder.build();方法通过Builder创建Log对象
//并且尝试获取checkpointDir和dataDir文件锁,Log类中的private void lock(File dir) throws IOException方法就是用来尝试过去锁的
log.replay();
//1,首先获取到checkpointDir的写锁
//2,获取最大的fileID
//3,读取log文件根据record的类型进行相应的操作,进行恢复;遍历所有的data目录
//4,将queue刷新到相关文件
open = true;//表示打开channel
int depth = getDepth();
Preconditions.checkState(queueRemaining.tryAcquire(depth),
"Unable to acquire " + depth + " permits " + channelNameDescriptor);
LOG.info("Queue Size after replay: " + depth + " "
+ channelNameDescriptor);
} catch (Throwable t) {
open = false;
startupError = t;
LOG.error("Failed to start the file channel " + channelNameDescriptor, t);
if (t instanceof Error) {
throw (Error) t;
}
}
if (open) {
//计数器开始统计
channelCounter.start();
channelCounter.setChannelSize(getDepth());
channelCounter.setChannelCapacity(capacity);
}
super.start();
}[/mw_shl_code]
org.apache.flume.channel.file.Log类用来将Event写入磁盘并将指向这些event的指针存入一个内存队列FlumeEventQueue中。并且启动一个线程,每过checkpointInterval毫秒写一次检查点log.writeCheckpoint()。
[mw_shl_code=java,true]workerExecutor.scheduleWithFixedDelay(new BackgroundWorker(this),
this.checkpointInterval, this.checkpointInterval,
TimeUnit.MILLISECONDS);[/mw_shl_code]
[mw_shl_code=java,true] static class BackgroundWorker implements Runnable {
private static final Logger LOG = LoggerFactory
.getLogger(BackgroundWorker.class);
private final Log log;
public BackgroundWorker(Log log) {
this.log = log;
}
@Override
public void run() {
try {
if (log.open) {
log.writeCheckpoint();
//将checpoint、inflightTakes、inflightPuts都刷新至磁盘,先后将inflightPuts、inflightTakes、checkpoint.meta重建,
//更新checkpoint文件并刷新至磁盘,这些文件都在checkpointDir目录下;更新log-ID.meta文件;同时肩负起删除log文件及其对应的meta文件的责任。
}
} catch (IOException e) {
LOG.error("Error doing checkpoint", e);
} catch (Throwable e) {
LOG.error("General error in checkpoint worker", e);
}
}
}[/mw_shl_code]
三,事务
很多方法和Memory的事务类相似。如:doTake(),doCommit(),doRollback(),doPut()
下面详细的介绍这几个方法。
1,doPut():source会调用put方法
[mw_shl_code=java,true]@Override
protected void doPut(Event event) throws InterruptedException {
channelCounter.incrementEventPutAttemptCount();
if(putList.remainingCapacity() == 0) {//是否有剩余空间
throw new ChannelException("Put queue for FileBackedTransaction " +
"of capacity " + putList.size() + " full, consider " +
"committing more frequently, increasing capacity or " +
"increasing thread count. " + channelNameDescriptor);
}
// this does not need to be in the critical section as it does not
// modify the structure of the log or queue.
if(!queueRemaining.tryAcquire(keepAlive, TimeUnit.SECONDS)) {//尝试等待
throw new ChannelFullException("The channel has reached it's capacity. "
+ "This might be the result of a sink on the channel having too "
+ "low of batch size, a downstream system running slower than "
+ "normal, or that the channel capacity is just too low. "
+ channelNameDescriptor);
}
boolean success = false;
log.lockShared();//获取checkpoint的读锁,doTake()方法也会获取读锁,所以doTake和doPut只能操作一个,无法同时操作。
try {
//transactionID是在TransactionIDOracle类中递增的
FlumeEventPointer ptr = log.put(transactionID, event);//将Event写入数据文件,使用RandomAccessFile。数据会缓存到inflightputs文件中
Preconditions.checkState(putList.offer(ptr), "putList offer failed "
+ channelNameDescriptor);
queue.addWithoutCommit(ptr, transactionID);//指针和事务ID加入到queue队列中。
success = true;
} catch (IOException e) {
throw new ChannelException("Put failed due to IO error "
+ channelNameDescriptor, e);
} finally {
log.unlockShared();//释放读锁
if(!success) {
// release slot obtained in the case
// the put fails for any reason
queueRemaining.release();//释放信号量
}
}
}[/mw_shl_code]
2,doTake():sink会调用put方法
[mw_shl_code=java,true] protected Event doTake() throws InterruptedException {
channelCounter.incrementEventTakeAttemptCount();
if(takeList.remainingCapacity() == 0) {
throw new ChannelException("Take list for FileBackedTransaction, capacity " +
takeList.size() + " full, consider committing more frequently, " +
"increasing capacity, or increasing thread count. "
+ channelNameDescriptor);
}
log.lockShared();//获取锁
/*
* 1. Take an event which is in the queue.
* 2. If getting that event does not throw NoopRecordException,
* then return it.
* 3. Else try to retrieve the next event from the queue
* 4. Repeat 2 and 3 until queue is empty or an event is returned.
*/
try {
while (true) {
FlumeEventPointer ptr = queue.removeHead(transactionID);//获取文件指针,ptr的数据结构是fileID和offset
if (ptr == null) {
return null;
} else {
try {
// first add to takeList so that if write to disk
// fails rollback actually does it's work
Preconditions.checkState(takeList.offer(ptr),
"takeList offer failed "
+ channelNameDescriptor);
log.take(transactionID, ptr); // write take to disk
Event event = log.get(ptr);//根据文件指针,使用log对象在磁盘中获取到Event。数据会缓存到inflighttakes文件中
return event;
} catch (IOException e) {
throw new ChannelException("Take failed due to IO error "
+ channelNameDescriptor, e);
} catch (NoopRecordException e) {
LOG.warn("Corrupt record replaced by File Channel Integrity " +
"tool found. Will retrieve next event", e);
takeList.remove(ptr);
} catch (CorruptEventException ex) {
if (fsyncPerTransaction) {
throw new ChannelException(ex);
}
LOG.warn("Corrupt record found. Event will be " +
"skipped, and next event will be read.", ex);
takeList.remove(ptr);
}
}
}
} finally {
log.unlockShared();//释放锁
}
} [/mw_shl_code]
3,doCommit():source和sink都会调用该方法提交事务
[mw_shl_code=java,true] @Override
protected void doCommit() throws InterruptedException {
int puts = putList.size();
int takes = takeList.size();
if(puts > 0) {//puts和takes不能同时都>0,其中有一个得是等于零
Preconditions.checkState(takes == 0, "nonzero puts and takes "
+ channelNameDescriptor);
log.lockShared();//获取锁
try {
log.commitPut(transactionID);//该操作会封装成一个ByteBuffer类型写入到文件,
channelCounter.addToEventPutSuccessCount(puts);
synchronized (queue) {
while(!putList.isEmpty()) {
if(!queue.addTail(putList.removeFirst())) {
StringBuilder msg = new StringBuilder();
msg.append("Queue add failed, this shouldn't be able to ");
msg.append("happen. A portion of the transaction has been ");
msg.append("added to the queue but the remaining portion ");
msg.append("cannot be added. Those messages will be consumed ");
msg.append("despite this transaction failing. Please report.");
msg.append(channelNameDescriptor);
LOG.error(msg.toString());
Preconditions.checkState(false, msg.toString());
}
}
queue.completeTransaction(transactionID);//清空checkpoint文件夹中inflightputs和inflighttakes文件的内容
}
} catch (IOException e) {
throw new ChannelException("Commit failed due to IO error "
+ channelNameDescriptor, e);
} finally {
log.unlockShared();//释放锁
}
} else if (takes > 0) {
log.lockShared();//释放锁
try {
log.commitTake(transactionID);//写入data文件
queue.completeTransaction(transactionID);//和上面操作一样
channelCounter.addToEventTakeSuccessCount(takes);
} catch (IOException e) {
throw new ChannelException("Commit failed due to IO error "
+ channelNameDescriptor, e);
} finally {
log.unlockShared();
}
queueRemaining.release(takes);
}
putList.clear();
takeList.clear();//清空两个队列
channelCounter.setChannelSize(queue.getSize());
}[/mw_shl_code]
4,doRollback():source和sink都会调用该方法回滚数据
[mw_shl_code=java,true] @Override
protected void doRollback() throws InterruptedException {
int puts = putList.size();
int takes = takeList.size();
log.lockShared();
try {
if(takes > 0) {
Preconditions.checkState(puts == 0, "nonzero puts and takes "
+ channelNameDescriptor);
synchronized (queue) {
while (!takeList.isEmpty()) {
Preconditions.checkState(queue.addHead(takeList.removeLast()),
"Queue add failed, this shouldn't be able to happen "
+ channelNameDescriptor);
}
}
}
putList.clear();
takeList.clear();
queue.completeTransaction(transactionID);
channelCounter.setChannelSize(queue.getSize());
log.rollback(transactionID);//也是封装成ByteBuffer,写入到缓存文件中。
} catch (IOException e) {
throw new ChannelException("Commit failed due to IO error "
+ channelNameDescriptor, e);
} finally {
log.unlockShared();
// since rollback is being called, puts will never make it on
// to the queue and we need to be sure to release the resources
queueRemaining.release(puts);
}
}[/mw_shl_code]
Flame的FileChannel在系统崩溃的时候保证数据的完整性和一致性,其实是通过JDK的字节通道实现的(java.nio.channels.FileChannel),字节通道为了保证数据在系统崩溃之后不丢失数据,文件的修改模式会被强制到底层存储设备。
最后看下Flume FileChannel的文件结构:
checkpoint目录:
checkpoint:存放Event在那个data文件logFileID的什么位置offset等信息。
inflighttakes:存放的是事务take的缓存数据,每隔段时间就重建文件。
内容:
1、16字节是校验码;
2、transactionID1+eventsCount1+eventPointer11+eventPointer12+...;
3、transactionID2+eventsCount2+eventPointer21+eventPointer22+...
inflightputs:存放的是事务对应的put缓存数据,每隔段时间就重建文件。
内容:
1、16字节是校验码;
2、transactionID1+eventsCount1+eventPointer11+eventPointer12+...;
3、transactionID2+eventsCount2+eventPointer21+eventPointer22+...
checkpoint.meta:主要存储的是logfileID及对应event的数量等信息。
data目录:
log-ID.meta:主要记录log-ID下一个写入位置以及logWriteOrderID等信息。
log-ID:数据文件,目录里数据文件保持不超过2个。
FileChannel实现比较复杂,先写这么多,以后有需要细细了解。
|
|