分享

关于Map Task读数据块的疑问

gwgyk 发表于 2014-11-19 14:37:44 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 4 9024
本帖最后由 gwgyk 于 2014-11-19 15:52 编辑

在DFSClient.DFSInputStream的blockSeekTo()方法,是用来定位具体的block,并创建一个BlockReader来读取block。我有这么几个问题:
1、BlockReader有两个子类:BlockReaderLocal和RemoteBlockReader,他们分别是用来读取本节点上的数据块和非本地节点上的数据块,对吗?

2、BlockReaderLocal不通过DataNode而是直接从本地读取数据块,而RemoteBlockReader则要先和DataNode建立socket连接(无论本地与否),然后再读取,我这么说对吗?

3、hadoop中默认是不使用BlockReaderLocal的,因为我在调试的时候发现,只有当shouldTryShortCircuitRead(targetAddr)为true时(该方法在blockSeekTo()方法中被调用),才会创建BlockReaderLocal。以下是该方法的源码:
  1. private boolean shouldTryShortCircuitRead(InetSocketAddress targetAddr)
  2.         throws IOException {
  3.       // Can't local read a block under construction, see HDFS-2757
  4.       return shortCircuitLocalReads && !blockUnderConstruction()
  5.         && isLocalAddress(targetAddr);
  6.     }
复制代码
但是shortCircuitLocalReads变量默认为false。为什么默认不使用BlockReaderLocal呢?
不好意思,又麻烦大家了,谢谢啦

已有(4)人评论

跳转到指定楼层
muyannian 发表于 2014-11-19 17:16:34





第一个、第二个还是比较赞同你的观点的,至于第三个,需要真正明白作者的意思,很多都是猜测,下面给你提供资料,仅供参考



private void fetchBlockByteRange(LocatedBlock block, long start,
long end, byte[] buf, int offset) throws IOException {
//
// Connect to best DataNode for desired Block, with potential offset
//
Socket dn = null;
int refetchToken = 1; // only need to get a new access token once

while (true) {
// cached block locations may have been updated by chooseDataNode()
// or fetchBlockAt(). Always get the latest list of locations at the
// start of the loop.
block = getBlockAt(block.getStartOffset(), false);
//选择一个Datanode<strong>读取</strong>数据
DNAddrPair retval = chooseDataNode(block);
DatanodeInfo chosenNode = retval.info;
InetSocketAddress targetAddr = retval.addr;
BlockReader reader = null;

int len = (int) (end - start + 1);
try {
Token<BlockTokenIdentifier> accessToken = block.getBlockToken();
// first try reading the block locally.
if (shouldTryShortCircuitRead(targetAddr)) {
try {
reader = getLocalBlockReader(conf, src, block.getBlock(),
accessToken, chosenNode, DFSClient.this.socketTimeout, start);
} catch (AccessControlException ex) {
LOG.warn("Short circuit access failed ", ex);
//Disable short circuit reads
shortCircuitLocalReads = false;
continue;
}
} else {
//如果本地没有,
// go to the datanode
//创建Socket连接
dn = socketFactory.createSocket();
NetUtils.connect(dn, targetAddr, socketTimeout);
dn.setSoTimeout(socketTimeout);
//利用建立的Socket链接,生成一个reader负责从DataNode读取数据
reader = BlockReader.newBlockReader(dn, src,
block.getBlock().getBlockId(), accessToken,
block.getBlock().getGenerationStamp(), start, len, buffersize,
verifyChecksum, clientName);
}
//<strong>读取</strong>数据
int nread = reader.readAll(buf, offset, len);
if (nread != len) {
throw new IOException("truncated return from reader.read(): " +
"excpected " + len + ", got " + nread);
}
return;
} catch (ChecksumException e) {
LOG.warn("fetchBlockByteRange(). Got a checksum exception for " +
src + " at " + block.getBlock() + ":" +
e.getPos() + " from " + chosenNode.getName());
reportChecksumFailure(src, block.getBlock(), chosenNode);
} catch (IOException e) {
if (refetchToken > 0 && tokenRefetchNeeded(e, targetAddr)) {
refetchToken--;
fetchBlockAt(block.getStartOffset());
continue;
} else {
LOG.warn("Failed to connect to " + targetAddr + " for file " + src
+ " for block " + block.getBlock() + ":" + e);
if (LOG.isDebugEnabled()) {
LOG.debug("Connection failure ", e);
}
}
} finally {
IOUtils.closeStream(reader);
IOUtils.closeSocket(dn);
}
// Put chosen node into dead list, continue
//如果<strong>读取</strong>失败,则将此DataNode标记为失败节点
addToDeadNodes(chosenNode);
}
}



回复

使用道具 举报

gwgyk 发表于 2014-11-19 17:36:21
muyannian 发表于 2014-11-19 17:16
第一个、第二个还是比较赞同你的观点的,至于第三个,需要真正明白作者的意思,很多都是猜测,下 ...

这个fetchBlockByteRange方法是在RemoteBlockReader中调用的是吧?因为我在debug的时候,设置成了BlockReaderLocal,所以没有看到哪儿有调用fetchBlockByteRange这个方法。
另外我想问问,我想找一些关于hadoop设计思想的资料,比如说作者为什么要这么设计,某个类的设计思路等资料,我应该去哪儿找啊?
回复

使用道具 举报

gwgyk 发表于 2014-11-19 19:01:55
muyannian 发表于 2014-11-19 18:07
这个还真没有发现。不过这有一些源码分析的文档,你可以参考

好的,谢谢啦
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条