本帖最后由 desehawk 于 2016-6-23 18:28 编辑
[mw_shl_code=bash,true] /**
* Sets the file pointer in the local block file to the specified value.
private void setBlockPosition(long offsetInBlock) throws IOException {
if (finalized) {
if (!isRecovery) {
throw new IOException("Write to offset " + offsetInBlock +
" of block " + block +
" that is already finalized.");
if (offsetInBlock > datanode.data.getLength(block)) {
throw new IOException("Write to offset " + offsetInBlock +
" of block " + block +
" that is already finalized and is of size " +
if (datanode.data.getChannelPosition(block, streams) == offsetInBlock) {
return; // nothing to do
long offsetInChecksum = BlockMetadataHeader.getHeaderSize() +
offsetInBlock / bytesPerChecksum * checksumSize;
if (out != null) {
if (checksumOut != null) {
// If this is a partial chunk, then read in pre-existing checksum
if (offsetInBlock % bytesPerChecksum != 0) {
LOG.info("setBlockPosition trying to set position to " +
offsetInBlock + " for " + block +
" which is not a multiple of bytesPerChecksum " +
computePartialChunkCrc(offsetInBlock, offsetInChecksum, bytesPerChecksum);
if (LOG.isDebugEnabled()) {
LOG.debug("Changing block file offset of block " + block + " from " +
datanode.data.getChannelPosition(block, streams) +
" to " + offsetInBlock +
" meta file offset to " + offsetInChecksum);
// set the position of the block file
datanode.data.setChannelPosition(block, streams, offsetInBlock, offsetInChecksum);
* reads in the partial crc chunk and computes checksum
* of pre-existing data in partial chunk.
private void computePartialChunkCrc(long blkoff, long ckoff,
int bytesPerChecksum) throws IOException {
// find offset of the beginning of partial chunk.
int sizePartialChunk = (int) (blkoff % bytesPerChecksum);
int checksumSize = checksum.getChecksumSize();
blkoff = blkoff - sizePartialChunk;
LOG.info("computePartialChunkCrc sizePartialChunk " +
sizePartialChunk + " " + block +
" offset in block " + blkoff +
" offset in metafile " + ckoff);
// create an input stream from the block file
// and read in partial crc chunk into temporary buffer
byte[] buf = new byte[sizePartialChunk];
byte[] crcbuf = new byte[checksumSize];
FSDataset.BlockInputStreams instr = null;
try {
instr = datanode.data.getTmpInputStreams(block, blkoff, ckoff);
IOUtils.readFully(instr.dataIn, buf, 0, sizePartialChunk);
// open meta file and read in crc value computer earlier
IOUtils.readFully(instr.checksumIn, crcbuf, 0, crcbuf.length);
} finally {
// compute crc of partial chunk from data read in the block file.
partialCrc = new PureJavaCrc32();
partialCrc.update(buf, 0, sizePartialChunk);
LOG.info("Read in partial CRC chunk from disk for " + block);
// paranoia! verify that the pre-computed crc matches what we
// recalculated just now
if (partialCrc.getValue() != FSInputChecker.checksum2long(crcbuf)) {
String msg = "Partial CRC " + partialCrc.getValue() +
" does not match value computed the " +
" last time file was closed " +
throw new IOException(msg);
//LOG.debug("Partial CRC matches 0x" +
// Long.toHexString(partialCrc.getValue()));
[mw_shl_code=bash,true]if (mirrorOut != null && !mirrorError) {
try {
mirrorOut.write(buf.array(), buf.position(), buf.remaining());
} catch (IOException e) {
[mw_shl_code=bash,true] if (len == 0) {
LOG.debug("Receiving empty packet for " + block);
} else {
offsetInBlock += len;
int checksumLen = ((len + bytesPerChecksum - 1)/bytesPerChecksum)*
if ( buf.remaining() != (checksumLen + len)) {
throw new IOException("Data remaining in packet does not match " +
"sum of checksumLen and dataLen");
int checksumOff = buf.position();
int dataOff = checksumOff + checksumLen;
byte pktBuf[] = buf.array();
buf.position(buf.limit()); // move to the end of the data.
/* skip verifying checksum iff this is not the last one in the
* pipeline and clientName is non-null. i.e. Checksum is verified
* on all the datanodes when the data is being written by a
* datanode rather than a client. Whe client is writing the data,
* protocol includes acks and only the last datanode needs to verify
* checksum.
if (mirrorOut == null || clientName.length() == 0) {
verifyChunks(pktBuf, dataOff, len, pktBuf, checksumOff);
try {
if (!finalized) {
//finally write to the disk :
out.write(pktBuf, dataOff, len);
// If this is a partial chunk, then verify that this is the only
// chunk in the packet. Calculate new crc for this chunk.
if (partialCrc != null) {
if (len > bytesPerChecksum) {
throw new IOException("Got wrong length during writeBlock(" +
block + ") from " + inAddr + " " +
"A packet can have only one partial chunk."+
" len = " + len +
" bytesPerChecksum " + bytesPerChecksum);
partialCrc.update(pktBuf, dataOff, len);
byte[] buf = FSOutputSummer.convertToByteStream(partialCrc, checksumSize);
LOG.debug("Writing out partial crc for data len " + len);
partialCrc = null;
} else {
checksumOut.write(pktBuf, checksumOff, checksumLen);
/// flush entire packet before sending ack
// update length only after flush to disk
datanode.data.setVisibleLength(block, offsetInBlock);
} catch (IOException iex) {
throw iex;
// put in queue for pending acks
if (responder != null) {
if (throttler != null) { // throttle I/O
return payloadLen;[/mw_shl_code]