分享

BookKeeper设计介绍及其在Hadoop2.0 Namenode HA方案中的使用分析

howtodown 发表于 2014-9-5 11:00:08 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 0 12510
问题导读:

1.什么是BookKeeper背景?
2.BookKeeper in HDFS如何实现对EditLog的读出和写回?
3.写日志都包含哪些流程?












BookKeeper背景BK是一个可靠的日志流记录系统,用于将系统产生的日志(也可以是其他数据)记录在BK集群上,由BK这个第三方Storage保证数据存储的可靠和一致性。典型场景是系统写write-ahead log,即先把log写到BK上,再对log做处理,比如将log写到内存的数据结构中。BookKeeper同时适用于任何单点写入并要求保证高性能和数据不丢失(Strong Durabilty Guarantees)的场景。
BK诞生于Hadoop2.0的namenode HA。在Hadoop中,出于故障恢复的考虑,Namenode在对它的记录做修改前都会先将本条修改的日志写到磁盘上。但是这里有一个潜在问题,当Namenode发生故障时,很可能连本地磁盘也不能访问,这时之前的记录的日志也就没用了。基于上述考虑,可以将Namenode的日志信息保存在一个可靠的外部Storage中。最初业界通过NFS这样的Share Storage来实现日志同步。之所以选择NFS,一方面因为可以很方便地实现数据共享,另外一方面是因为NFS相对稳定成熟。虽然如此,NFS也有缺点不能满足HDFS的在线存储业务:网络单点及其存储节点单点。为了满足共享日志的高可用性,社区引入了BK。除此之外还有默认的HA方案:QJM。

BookKeeper介绍
BK带有多个读写日志的server,称为 bookies。每一个bookie是一个bk的存储服务,存储了写到bk上的write-ahead日志,及其数据内容。写入的log流(称它为流是因为BK记录的是byte[])称为 ledgers,一个ledger是一个日志文件,每个日志单元叫 ledger  entry,也就是bookies是存ledgers的。ledger只支持append操作,而且同时只能有一个单线程来写。ZK充当BK的元数据存储服务,在zk中会存储ledger相关的元数据,包括当前可用的bookies,ledger分布的位置等。
BK通过读写多个存储节点达到高可用性,同时为了恢复由于异常造成的多节点数据不一致性,引入了数据一致性算法。BK的可用性还体现在只要有足够多的bookies可用,整个服务就可用。实际上,一份entry的写入需要确保N份日志冗余在N个bookie上写成功,而我们需要>N个bookie提供服务。在启动BK的时候,需要指定一个ensemble值,即bookie可用的最小节点数量,还需要指定一个quorums值,即日志写入bk服务端的冗余份数。BK的可靠性体现在服务有多个备份,entry的记录也是冗余的。BK的可扩展性体现在可以增加bookie服务的定额数目,同时增加server数据可以一定程度提高吞吐量。
Ledger在BK中扮演了很重要的角色,其相关操作及其作用如下:

  • CreateLedger:创建一个空的ledger,此时会在zk中存储相关元数据;
  • AddEntry:添加一个记录到ledger中,如果客户端失败或者ledger已经关闭,则不能再追加entry;
  • openLedger:开始读取数据前,必须先打开ledger,如果某ledger处于未关闭,不能读取相关数据,如果有异常,需先恢复;
  • readEntries:读取ledger中的entry

从编码角度讲,操纵entry读写的类为LedgerHandle,LedgerHandle对应一个可以被client读写entry的ledger。下面是创建ledgerHandle并读写entry的例子。


  1. ClientConfiguration conf = new ClientConfiguration();
  2. conf.setZkServers("zk1.example.com:2181,zk2.example.com:2181,zk3.example.com:2181");
  3. BookKeeper client = new BookKeeper(conf);
  4. LedgerHandle lh = client.createLedger(3, 2, DigestType.CRC32, "foobar");
  5. lh.addEntry("Hello World!".getBytes());
  6. lh.close();
  7. LedgerHandle lh2 = client.openLedger(1, DigestType.CRC32, "foobar");
  8. long lastEntry = lh2.getLastAddConfirmed();
  9. Enumeration<LedgerEntry> entries = lh2.readEntries(0, 9);
  10. while (entries.hasMoreElements()) {
  11.         byte[] bytes = entries.nextElement().getEntry();
  12.         System.out.println(new String(bytes));
  13. }
复制代码

更多BK文档可以参考官网文档


BookKeeper in HDFSHdfs有两个抽象类提供对EditLog的读出和写回:EditLogOutputStream(以下简称ELOS)和EditLogInputStream(以下简称ELIS)。同时还有一个JournalManager接口,负责管理EditLog的可靠存取。它的实现包括QJM(QuorumJournalManager)和BKJM(BookKeeperJournalManager)。

写日志
对于hdfs而言,主节点写的每一个日志对象为BK的entry,entry的集合组成一个ledger,每一个日志段对应一个ledger,相同日志段追加edits即为向ledger追加entry。Ledger有一个递增的ledgerId,entry也有递增的entryId,每个entryId对应一个txId。
ELOS使用write()将FSEditLogOp往外写,对应的BookKeeperEditLogOutputStream的实现为:


  1. @Override
  2.   public void write(FSEditLogOp op) throws IOException {
  3.     writer.writeOp(op);
  4.     if (bufCurrent.getLength() > transmissionThreshold) {
  5.       transmit();
  6.     }
  7.   }
复制代码

BookKeeperEditLogOutputStream内部有一个buffer,每次调用write()写FSEditLogOp的时候,会由一个Writer将此次FSEditLogOp写入buffer,当buffer长度达到门槛值时,进行transmit操作:把buffer里的editLog发送到BK上,代码如下:

  1. /**
  2.    * Transmit the current buffer to bookkeeper.
  3.    * Synchronised at the FSEditLog level. #write() and #setReadyToFlush()
  4.    * are never called at the same time.
  5.    */
  6.   private void transmit() throws IOException {
  7.     if (!transmitResult.compareAndSet(BKException.Code.OK,
  8.                                      BKException.Code.OK)) {
  9.       throw new IOException("Trying to write to an errored stream;"
  10.           + " Error code : (" + transmitResult.get()
  11.           + ") " + BKException.getMessage(transmitResult.get()));
  12.     }
  13.     if (bufCurrent.getLength() > 0) {
  14.       byte[] entry = Arrays.copyOf(bufCurrent.getData(),
  15.                                    bufCurrent.getLength());
  16.       lh.asyncAddEntry(entry, this, null);
  17.       bufCurrent.reset();
  18.       outstandingRequests.incrementAndGet();
  19.     }
  20.   }
复制代码


lh为BK的LedgerHandle,asyncAddEntry方法异步将entry写往一个open状态的ledger。这就是一个简单的把Editlog写往BK的过程。
BKJM简单写的代码如下:

  1. public void testSimpleWrite() throws Exception {
  2.     NamespaceInfo nsi = newNSInfo();
  3.     BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
  4.         BKJMUtil.createJournalURI("/hdfsjournal-simplewrite"), nsi);
  5.     bkjm.format(nsi);
  6.     EditLogOutputStream out = bkjm.startLogSegment(1);
  7.     for (long i = 1 ; i <= 100; i++) {
  8.       FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
  9.       op.setTransactionId(i);
  10.       out.write(op);
  11.     }
  12.     out.close();
  13.     bkjm.finalizeLogSegment(1, 100);
  14.     String zkpath = bkjm.finalizedLedgerZNode(1, 100);
  15.   }
复制代码

BKJM的startLogSegment(txId)将产生一个新的ledger,对应一个新的日志段,该日志段状态为接收写入日志的状态。创建ledger之前有一些校验工作

  1. if (txId <= maxTxId.get()) {
  2.       throw new IOException("We've already seen " + txId
  3.           + ". A new stream cannot be created with it");
  4.     }
  5.     try {
  6.       String existingInprogressNode = ci.read();
  7.       if (null != existingInprogressNode
  8.           && zkc.exists(existingInprogressNode, false) != null) {
  9.         throw new IOException("Inprogress node already exists");
  10.       }
  11.       if (currentLedger != null) {
  12.         // bookkeeper errored on last stream, clean up ledger
  13.         currentLedger.close();
  14.       }
  15.       currentLedger = bkc.createLedger(ensembleSize, quorumSize,
  16.                                        BookKeeper.DigestType.MAC,
  17.                                        digestpw.getBytes());
  18.     } catch (BKException bke) {
  19.       throw new IOException("Error creating ledger", bke);
  20.     } catch (KeeperException ke) {
  21.       throw new IOException("Error in zookeeper while creating ledger", ke);
  22.     } catch (InterruptedException ie) {
  23.       Thread.currentThread().interrupt();
  24.       throw new IOException("Interrupted creating ledger", ie);
  25.     }
复制代码


Ledger的创建还对应一个新的EditLogLedgerMetadata,该类记录这个日志段的元信息,包括zkPath,ledgerId,开始和结束txId等,在读取ledger里的日志内容的时候需要这些元数据信息。
BKJM的finalizeLogSegment()将文件由正在写入日志的状态转化为不接收写日志的状态。BKJM会create ledger,delete ledger,open ledger,这里的ledger即LedgerHandler类,它对每个ledger entry进行读写操作。

写日志总体流程
ZK作为BK的元数据服务器,里面存储了哪些bookie服务是可用的,同时也记录了目前系统有哪些ledger,及其ledger相关信息,如该ledger数据存储在哪些机器上,及其该ledger起始,结束entryid等。Bookie节点存储实际的数据,及其数据的读写服务。
写操作由主节点来完成,当主节点调用setReadyToFlush操作,会调用RPC同时向N(N=quorums)个bookie节点写,flush异步等待响应。
主节点对bk的操作,其实就是对ledger的操作,在开始向bk服务写数据前,首先需要打开ledger,打开ledger就会与配置的所有bookie节点建立连接;打开连接后,数据以entry为单位以RR算法选择向N(N=quorums)个bookie节点写entry数据,并且异步地等待结果返回,有任何一个bookie写入失败,则需要重新选择一个bookie写入失败的副本。
当bookie服务端接收到写入数据后,首先会写日志,然后根据同步或者异步算法将数据同步到磁盘上。写入数据过程中,首先会写入log文件,写入的内容包含ledgerid,entryid,EntrySize,LastConfirmed,及其真实数据内容。然后在相应ledger文件中记录下entryid,及其该entry所在的日志文件,偏移量等。


读日志读日志相比写日志过程,相对简单一些。同样,读日志过程也支持高可用。BKJM通过selectInputStreams方法读出一个范围内的ELIS集合,每个ELIS是BookKeeperEditLogInputStream类,new BookKeeperEditLogInputStream需要得到一个EditLogLedgerMetadata,并打开对应的ledger。具体BookKeeperEditLogInputStream类里的内容就不详细说明了。


  1. @Override
  2.   public void selectInputStreams(Collection<EditLogInputStream> streams,
  3.       long fromTxId, boolean inProgressOk, boolean forReading)
  4.       throws IOException {
  5.     List<EditLogLedgerMetadata> currentLedgerList = getLedgerList(fromTxId,
  6.         inProgressOk);
  7.     try {
  8.       BookKeeperEditLogInputStream elis = null;
  9.       for (EditLogLedgerMetadata l : currentLedgerList) {
  10.         long lastTxId = l.getLastTxId();
  11.         if (l.isInProgress()) {
  12.           lastTxId = recoverLastTxId(l, false);
  13.         }
  14.         // Check once again, required in case of InProgress and is case of any
  15.         // gap.
  16.         if (fromTxId >= l.getFirstTxId() && fromTxId <= lastTxId) {
  17.           LedgerHandle h;
  18.           if (l.isInProgress()) { // we don't want to fence the current journal
  19.             h = bkc.openLedgerNoRecovery(l.getLedgerId(),
  20.                 BookKeeper.DigestType.MAC, digestpw.getBytes());
  21.           } else {
  22.             h = bkc.openLedger(l.getLedgerId(), BookKeeper.DigestType.MAC,
  23.                 digestpw.getBytes());
  24.           }
  25.           elis = new BookKeeperEditLogInputStream(h, l);
  26.           elis.skipTo(fromTxId);
  27.         } else {
  28.           // If mismatches then there might be some gap, so we should not check
  29.           // further.
  30.           return;
  31.         }
  32.         streams.add(elis);
  33.         if (elis.getLastTxId() == HdfsConstants.INVALID_TXID) {
  34.           return;
  35.         }
  36.         fromTxId = elis.getLastTxId() + 1;
  37.       }
  38.     } catch (BKException e) {
  39.       throw new IOException("Could not open ledger for " + fromTxId, e);
  40.     } catch (InterruptedException ie) {
  41.       Thread.currentThread().interrupt();
  42.       throw new IOException("Interrupted opening ledger for " + fromTxId, ie);
  43.     }
  44.   }
复制代码


首先选择日志文件,建立输入流。从节点触发消化日志后,首先会查询ZK,获取到主节点写入ZK的edits元数据信息(不包含inprocess状态的edits元数据),这个元数据包含日志段的startTxid,lastTxid,ledgerID,同时也会打开相应的ledger,并获取其元数据,如ledger的quorumSize,ensembleSize,lastEntryId等,同时按照txid先后顺序对ledger进行排序,放入输入流集合。需要强调的是,当打开ledger时,会检查其entry副本之间的一致性,如果不一致需恢复。
准备好输入流以后,开始消化日志,依次操作输入流集合的ledgers,读取每个ledger内的entry:

  • 通过查询ledger元数据,同时通过RR算法确定该entry存储在哪几个bookies;
  • 尝试从bookies集合的第一个bookie服务读取entry,如果成功,该entry就读取成功,如果失败,转入第3步;
  • 尝试从bookies集合的第二个bookie服务读取entry,如果成功,该entry就读取成功,如果失败,依次类推,如果尝试读取完所有的bookies均失败,则该entry读取失败;


恢复
BKJM还有恢复机制,相关接口有recoverUnfinalizedSegments(),recoverLastTxId()。Bookie数据恢复检查通过定时或者人工发起,集群数据修复流程:

  • 通过zk查询到ledger元数据;
  • 通过元数据,查询相关bookie中存储的ledger的entry是否完整;
  • 如果查询到存储在某bookie上的entry不完整,则需要进入数据恢复流程;
  • 首先从bk服务端读取到ledger相关的entry,然后将其写到需要恢复entry的某bookie服务端;
  • Ledger数据恢复完成后,需要更新ledger的segment相关元数据。



总结
本文首先介绍了BookKeeper的背景和使用场景,然后简单介绍了BK的主要部件及使用方法,最后粗略地分析了hadoop2.0 namenode BKJM的HA实现,介绍了EditLog写入和读出BK的过程。通过阅读hadoopBKJM部分的代码,帮助学习怎样在自己的系统里加入BookKeeper,让BK来保证日志的可靠和容灾恢复等功能。








没找到任何评论,期待你打破沉寂

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条