第一个、第二个还是比较赞同你的观点的,至于第三个,需要真正明白作者的意思,很多都是猜测,下面给你提供资料,仅供参考
private void fetchBlockByteRange(LocatedBlock block, long start,
long end, byte[] buf, int offset) throws IOException {
//
// Connect to best DataNode for desired Block, with potential offset
//
Socket dn = null;
int refetchToken = 1; // only need to get a new access token once
while (true) {
// cached block locations may have been updated by chooseDataNode()
// or fetchBlockAt(). Always get the latest list of locations at the
// start of the loop.
block = getBlockAt(block.getStartOffset(), false);
//选择一个Datanode<strong>读取</strong>数据
DNAddrPair retval = chooseDataNode(block);
DatanodeInfo chosenNode = retval.info;
InetSocketAddress targetAddr = retval.addr;
BlockReader reader = null;
int len = (int) (end - start + 1);
try {
Token<BlockTokenIdentifier> accessToken = block.getBlockToken();
// first try reading the block locally.
if (shouldTryShortCircuitRead(targetAddr)) {
try {
reader = getLocalBlockReader(conf, src, block.getBlock(),
accessToken, chosenNode, DFSClient.this.socketTimeout, start);
} catch (AccessControlException ex) {
LOG.warn("Short circuit access failed ", ex);
//Disable short circuit reads
shortCircuitLocalReads = false;
continue;
}
} else {
//如果本地没有,
// go to the datanode
//创建Socket连接
dn = socketFactory.createSocket();
NetUtils.connect(dn, targetAddr, socketTimeout);
dn.setSoTimeout(socketTimeout);
//利用建立的Socket链接,生成一个reader负责从DataNode读取数据
reader = BlockReader.newBlockReader(dn, src,
block.getBlock().getBlockId(), accessToken,
block.getBlock().getGenerationStamp(), start, len, buffersize,
verifyChecksum, clientName);
}
//<strong>读取</strong>数据
int nread = reader.readAll(buf, offset, len);
if (nread != len) {
throw new IOException("truncated return from reader.read(): " +
"excpected " + len + ", got " + nread);
}
return;
} catch (ChecksumException e) {
LOG.warn("fetchBlockByteRange(). Got a checksum exception for " +
src + " at " + block.getBlock() + ":" +
e.getPos() + " from " + chosenNode.getName());
reportChecksumFailure(src, block.getBlock(), chosenNode);
} catch (IOException e) {
if (refetchToken > 0 && tokenRefetchNeeded(e, targetAddr)) {
refetchToken--;
fetchBlockAt(block.getStartOffset());
continue;
} else {
LOG.warn("Failed to connect to " + targetAddr + " for file " + src
+ " for block " + block.getBlock() + ":" + e);
if (LOG.isDebugEnabled()) {
LOG.debug("Connection failure ", e);
}
}
} finally {
IOUtils.closeStream(reader);
IOUtils.closeSocket(dn);
}
// Put chosen node into dead list, continue
//如果<strong>读取</strong>失败,则将此DataNode标记为失败节点
addToDeadNodes(chosenNode);
}
}
|