问题导读:
1.什么是BookKeeper背景?
2.BookKeeper in HDFS如何实现对EditLog的读出和写回?
3.写日志都包含哪些流程?
BookKeeper背景 BK是一个可靠的日志流记录系统,用于将系统产生的日志(也可以是其他数据)记录在BK集群上,由BK这个第三方Storage保证数据存储的可靠和一致性。典型场景是系统写write-ahead log,即先把log写到BK上,再对log做处理,比如将log写到内存的数据结构中。BookKeeper同时适用于任何单点写入并要求保证高性能和数据不丢失(Strong Durabilty Guarantees)的场景。
BK诞生于Hadoop2.0的namenode HA。在Hadoop中,出于故障恢复的考虑,Namenode在对它的记录做修改前都会先将本条修改的日志写到磁盘上。但是这里有一个潜在问题,当Namenode发生故障时,很可能连本地磁盘也不能访问,这时之前的记录的日志也就没用了。基于上述考虑,可以将Namenode的日志信息保存在一个可靠的外部Storage中。最初业界通过NFS这样的Share Storage来实现日志同步。之所以选择NFS,一方面因为可以很方便地实现数据共享,另外一方面是因为NFS相对稳定成熟。虽然如此,NFS也有缺点不能满足HDFS的在线存储业务:网络单点及其存储节点单点。为了满足共享日志的高可用性,社区引入了BK。除此之外还有默认的HA方案:QJM。
BookKeeper介绍
BK带有多个读写日志的server,称为 bookies。每一个bookie是一个bk的存储服务,存储了写到bk上的write-ahead日志,及其数据内容。写入的log流(称它为流是因为BK记录的是byte[])称为 ledgers,一个ledger是一个日志文件,每个日志单元叫 ledger entry,也就是bookies是存ledgers的。ledger只支持append操作,而且同时只能有一个单线程来写。ZK充当BK的元数据存储服务,在zk中会存储ledger相关的元数据,包括当前可用的bookies,ledger分布的位置等。
BK通过读写多个存储节点达到高可用性,同时为了恢复由于异常造成的多节点数据不一致性,引入了数据一致性算法。BK的可用性还体现在只要有足够多的bookies可用,整个服务就可用。实际上,一份entry的写入需要确保N份日志冗余在N个bookie上写成功,而我们需要>N个bookie提供服务。在启动BK的时候,需要指定一个ensemble值,即bookie可用的最小节点数量,还需要指定一个quorums值,即日志写入bk服务端的冗余份数。BK的可靠性体现在服务有多个备份,entry的记录也是冗余的。BK的可扩展性体现在可以增加bookie服务的定额数目,同时增加server数据可以一定程度提高吞吐量。
Ledger在BK中扮演了很重要的角色,其相关操作及其作用如下:
CreateLedger:创建一个空的ledger,此时会在zk中存储相关元数据; AddEntry:添加一个记录到ledger中,如果客户端失败或者ledger已经关闭,则不能再追加entry; openLedger:开始读取数据前,必须先打开ledger,如果某ledger处于未关闭,不能读取相关数据,如果有异常,需先恢复; readEntries:读取ledger中的entry
从编码角度讲,操纵entry读写的类为LedgerHandle,LedgerHandle对应一个可以被client读写entry的ledger。下面是创建ledgerHandle并读写entry的例子。
ClientConfiguration conf = new ClientConfiguration();
conf.setZkServers("zk1.example.com:2181,zk2.example.com:2181,zk3.example.com:2181");
BookKeeper client = new BookKeeper(conf);
LedgerHandle lh = client.createLedger(3, 2, DigestType.CRC32, "foobar");
lh.addEntry("Hello World!".getBytes());
lh.close();
LedgerHandle lh2 = client.openLedger(1, DigestType.CRC32, "foobar");
long lastEntry = lh2.getLastAddConfirmed();
Enumeration<LedgerEntry> entries = lh2.readEntries(0, 9);
while (entries.hasMoreElements()) {
byte[] bytes = entries.nextElement().getEntry();
System.out.println(new String(bytes));
}
复制代码
更多BK文档可以参考官网文档 。
BookKeeper in HDFS Hdfs有两个抽象类提供对EditLog的读出和写回:EditLogOutputStream(以下简称ELOS)和EditLogInputStream(以下简称ELIS)。同时还有一个JournalManager接口,负责管理EditLog的可靠存取。它的实现包括QJM(QuorumJournalManager)和BKJM(BookKeeperJournalManager)。
写日志
对于hdfs而言,主节点写的每一个日志对象为BK的entry,entry的集合组成一个ledger,每一个日志段对应一个ledger,相同日志段追加edits即为向ledger追加entry。Ledger有一个递增的ledgerId,entry也有递增的entryId,每个entryId对应一个txId。
ELOS使用write()将FSEditLogOp往外写,对应的BookKeeperEditLogOutputStream的实现为:
@Override
public void write(FSEditLogOp op) throws IOException {
writer.writeOp(op);
if (bufCurrent.getLength() > transmissionThreshold) {
transmit();
}
}
复制代码
BookKeeperEditLogOutputStream内部有一个buffer,每次调用write()写FSEditLogOp的时候,会由一个Writer将此次FSEditLogOp写入buffer,当buffer长度达到门槛值时,进行transmit操作:把buffer里的editLog发送到BK上,代码如下:
/**
* Transmit the current buffer to bookkeeper.
* Synchronised at the FSEditLog level. #write() and #setReadyToFlush()
* are never called at the same time.
*/
private void transmit() throws IOException {
if (!transmitResult.compareAndSet(BKException.Code.OK,
BKException.Code.OK)) {
throw new IOException("Trying to write to an errored stream;"
+ " Error code : (" + transmitResult.get()
+ ") " + BKException.getMessage(transmitResult.get()));
}
if (bufCurrent.getLength() > 0) {
byte[] entry = Arrays.copyOf(bufCurrent.getData(),
bufCurrent.getLength());
lh.asyncAddEntry(entry, this, null);
bufCurrent.reset();
outstandingRequests.incrementAndGet();
}
}
复制代码
lh为BK的LedgerHandle,asyncAddEntry方法异步将entry写往一个open状态的ledger。这就是一个简单的把Editlog写往BK的过程。
BKJM简单写的代码如下:
public void testSimpleWrite() throws Exception {
NamespaceInfo nsi = newNSInfo();
BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
BKJMUtil.createJournalURI("/hdfsjournal-simplewrite"), nsi);
bkjm.format(nsi);
EditLogOutputStream out = bkjm.startLogSegment(1);
for (long i = 1 ; i <= 100; i++) {
FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
op.setTransactionId(i);
out.write(op);
}
out.close();
bkjm.finalizeLogSegment(1, 100);
String zkpath = bkjm.finalizedLedgerZNode(1, 100);
}
复制代码
BKJM的startLogSegment(txId)将产生一个新的ledger,对应一个新的日志段,该日志段状态为接收写入日志的状态。创建ledger之前有一些校验工作
if (txId <= maxTxId.get()) {
throw new IOException("We've already seen " + txId
+ ". A new stream cannot be created with it");
}
try {
String existingInprogressNode = ci.read();
if (null != existingInprogressNode
&& zkc.exists(existingInprogressNode, false) != null) {
throw new IOException("Inprogress node already exists");
}
if (currentLedger != null) {
// bookkeeper errored on last stream, clean up ledger
currentLedger.close();
}
currentLedger = bkc.createLedger(ensembleSize, quorumSize,
BookKeeper.DigestType.MAC,
digestpw.getBytes());
} catch (BKException bke) {
throw new IOException("Error creating ledger", bke);
} catch (KeeperException ke) {
throw new IOException("Error in zookeeper while creating ledger", ke);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new IOException("Interrupted creating ledger", ie);
}
复制代码
Ledger的创建还对应一个新的EditLogLedgerMetadata,该类记录这个日志段的元信息,包括zkPath,ledgerId,开始和结束txId等,在读取ledger里的日志内容的时候需要这些元数据信息。
BKJM的finalizeLogSegment()将文件由正在写入日志的状态转化为不接收写日志的状态。BKJM会create ledger,delete ledger,open ledger,这里的ledger即LedgerHandler类,它对每个ledger entry进行读写操作。
写日志总体流程
ZK作为BK的元数据服务器,里面存储了哪些bookie服务是可用的,同时也记录了目前系统有哪些ledger,及其ledger相关信息,如该ledger数据存储在哪些机器上,及其该ledger起始,结束entryid等。Bookie节点存储实际的数据,及其数据的读写服务。
写操作由主节点来完成,当主节点调用setReadyToFlush操作,会调用RPC同时向N(N=quorums)个bookie节点写,flush异步等待响应。
主节点对bk的操作,其实就是对ledger的操作,在开始向bk服务写数据前,首先需要打开ledger,打开ledger就会与配置的所有bookie节点建立连接;打开连接后,数据以entry为单位以RR算法选择向N(N=quorums)个bookie节点写entry数据,并且异步地等待结果返回,有任何一个bookie写入失败,则需要重新选择一个bookie写入失败的副本。
当bookie服务端接收到写入数据后,首先会写日志,然后根据同步或者异步算法将数据同步到磁盘上。写入数据过程中,首先会写入log文件,写入的内容包含ledgerid,entryid,EntrySize,LastConfirmed,及其真实数据内容。然后在相应ledger文件中记录下entryid,及其该entry所在的日志文件,偏移量等。
读日志 读日志相比写日志过程,相对简单一些。同样,读日志过程也支持高可用。BKJM通过selectInputStreams方法读出一个范围内的ELIS集合,每个ELIS是BookKeeperEditLogInputStream类,new BookKeeperEditLogInputStream需要得到一个EditLogLedgerMetadata,并打开对应的ledger。具体BookKeeperEditLogInputStream类里的内容就不详细说明了。
@Override
public void selectInputStreams(Collection<EditLogInputStream> streams,
long fromTxId, boolean inProgressOk, boolean forReading)
throws IOException {
List<EditLogLedgerMetadata> currentLedgerList = getLedgerList(fromTxId,
inProgressOk);
try {
BookKeeperEditLogInputStream elis = null;
for (EditLogLedgerMetadata l : currentLedgerList) {
long lastTxId = l.getLastTxId();
if (l.isInProgress()) {
lastTxId = recoverLastTxId(l, false);
}
// Check once again, required in case of InProgress and is case of any
// gap.
if (fromTxId >= l.getFirstTxId() && fromTxId <= lastTxId) {
LedgerHandle h;
if (l.isInProgress()) { // we don't want to fence the current journal
h = bkc.openLedgerNoRecovery(l.getLedgerId(),
BookKeeper.DigestType.MAC, digestpw.getBytes());
} else {
h = bkc.openLedger(l.getLedgerId(), BookKeeper.DigestType.MAC,
digestpw.getBytes());
}
elis = new BookKeeperEditLogInputStream(h, l);
elis.skipTo(fromTxId);
} else {
// If mismatches then there might be some gap, so we should not check
// further.
return;
}
streams.add(elis);
if (elis.getLastTxId() == HdfsConstants.INVALID_TXID) {
return;
}
fromTxId = elis.getLastTxId() + 1;
}
} catch (BKException e) {
throw new IOException("Could not open ledger for " + fromTxId, e);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new IOException("Interrupted opening ledger for " + fromTxId, ie);
}
}
复制代码
首先选择日志文件,建立输入流。从节点触发消化日志后,首先会查询ZK,获取到主节点写入ZK的edits元数据信息(不包含inprocess状态的edits元数据),这个元数据包含日志段的startTxid,lastTxid,ledgerID,同时也会打开相应的ledger,并获取其元数据,如ledger的quorumSize,ensembleSize,lastEntryId等,同时按照txid先后顺序对ledger进行排序,放入输入流集合。需要强调的是,当打开ledger时,会检查其entry副本之间的一致性,如果不一致需恢复。
准备好输入流以后,开始消化日志,依次操作输入流集合的ledgers,读取每个ledger内的entry:
通过查询ledger元数据,同时通过RR算法确定该entry存储在哪几个bookies; 尝试从bookies集合的第一个bookie服务读取entry,如果成功,该entry就读取成功,如果失败,转入第3步; 尝试从bookies集合的第二个bookie服务读取entry,如果成功,该entry就读取成功,如果失败,依次类推,如果尝试读取完所有的bookies均失败,则该entry读取失败;
恢复
BKJM还有恢复机制,相关接口有recoverUnfinalizedSegments(),recoverLastTxId()。Bookie数据恢复检查通过定时或者人工发起,集群数据修复流程:
通过zk查询到ledger元数据; 通过元数据,查询相关bookie中存储的ledger的entry是否完整; 如果查询到存储在某bookie上的entry不完整,则需要进入数据恢复流程; 首先从bk服务端读取到ledger相关的entry,然后将其写到需要恢复entry的某bookie服务端; Ledger数据恢复完成后,需要更新ledger的segment相关元数据。
总结
本文首先介绍了BookKeeper的背景和使用场景,然后简单介绍了BK的主要部件及使用方法,最后粗略地分析了hadoop2.0 namenode BKJM的HA实现,介绍了EditLog写入和读出BK的过程。通过阅读hadoopBKJM部分的代码,帮助学习怎样在自己的系统里加入BookKeeper,让BK来保证日志的可靠和容灾恢复等功能。