本帖最后由 howtodown 于 2014-9-10 11:49 编辑
可以看下面代码,下面函数的作用是把zookeeper的watch来监控ROOT region。
在hbase伪分布中,你的hbase分布式
还有下面的属性你是true,还是false
hbase.cluster.distributed 设置为 true
* Puts a watch in ZooKeeper to monitor the file of the -ROOT- region.
* This method just registers an asynchronous callback.
*/
- private void getRootRegion() {
- final AsyncCallback.DataCallback cb = new AsyncCallback.DataCallback() {
- @SuppressWarnings("fallthrough")
- public void processResult(final int rc, final String path,
- final Object ctx, final byte[] data,
- final Stat stat) {
- if (rc == Code.NONODE.intValue()) {
- LOG.error("The znode for the -ROOT- region doesn't exist!");
- retryGetRootRegionLater(this);
- return;
- } else if (rc != Code.OK.intValue()) {
- LOG.error("Looks like our ZK session expired or is broken, rc="
- + rc + ": " + Code.get(rc));
- disconnectZK();
- connectZK();
- return;
- }
- if (data == null || data.length == 0 || data.length > Short.MAX_VALUE) {
- LOG.error("The location of the -ROOT- region in ZooKeeper is "
- + (data == null || data.length == 0 ? "empty"
- : "too large (" + data.length + " bytes!)"));
+ final class ZKCallback implements AsyncCallback.DataCallback {
+
+ @SuppressWarnings("fallthrough")
+ public void processResult(final int rc, final String path,
+ final Object ctx, final byte[] data,
+ final Stat stat) {
+ if (rc == Code.NONODE.intValue()) {
+ LOG.error("The znode for the -ROOT- region doesn't exist!");
+ retryGetRootRegionLater(this);
+ return;
+ } else if (rc != Code.OK.intValue()) {
+ LOG.error("Looks like our ZK session expired or is broken, rc="
+ + rc + ": " + Code.get(rc));
+ disconnectZK();
+ connectZK();
+ return;
+ }
+ if (data == null || data.length == 0 || data.length > Short.MAX_VALUE) {
+ LOG.error("The location of the -ROOT- region in ZooKeeper is "
+ + (data == null || data.length == 0 ? "empty"
+ : "too large (" + data.length + " bytes!)"));
+ retryGetRootRegionLater(this);
+ return; // TODO(tsuna): Add a watch to wait until the file changes.
+ }
+ // There are 3 cases. Older versions of HBase encode the location
+ // of the root region as "host:port", 0.91 uses "host,port,startcode"
+ // and newer versions of 0.91 use "<metadata>host,port,startcode"
+ // where the <metadata> starts with MAGIC, then a 4 byte integer,
+ // then that many bytes of meta data.
+ boolean newstyle; // True if we expect a 0.91 style location.
+ final short offset; // Bytes to skip at the beginning of data.
+ short firstsep = -1; // Index of the first separator (':' or ',').
+ if (data[0] == MAGIC) {
+ newstyle = true;
+ final int metadata_length = Bytes.getInt(data, 1);
+ if (metadata_length < 1 || metadata_length > 65000) {
+ LOG.error("Malformed meta-data in " + Bytes.pretty(data)
+ + ", invalid metadata length=" + metadata_length);
retryGetRootRegionLater(this);
return; // TODO(tsuna): Add a watch to wait until the file changes.
}
- // There are 3 cases. Older versions of HBase encode the location
- // of the root region as "host:port", 0.91 uses "host,port,startcode"
- // and newer versions of 0.91 use "<metadata>host,port,startcode"
- // where the <metadata> starts with MAGIC, then a 4 byte integer,
- // then that many bytes of meta data.
- boolean newstyle; // True if we expect a 0.91 style location.
- final short offset; // Bytes to skip at the beginning of data.
- short firstsep = -1; // Index of the first separator (':' or ',').
- if (data[0] == MAGIC) {
- newstyle = true;
- final int metadata_length = Bytes.getInt(data, 1);
- if (metadata_length < 1 || metadata_length > 65000) {
- LOG.error("Malformed meta-data in " + Bytes.pretty(data)
- + ", invalid metadata length=" + metadata_length);
- retryGetRootRegionLater(this);
- return; // TODO(tsuna): Add a watch to wait until the file changes.
- }
- offset = (short) (1 + 4 + metadata_length);
- } else {
- newstyle = false; // Maybe true, the loop below will tell us.
- offset = 0;
- }
- final short n = (short) data.length;
- // Look for the first separator. Skip the offset, and skip the
- // first byte, because we know the separate can only come after
- // at least one byte.
- loop: for (short i = (short) (offset + 1); i < n; i++) {
- switch (data) {
- case ',':
- newstyle = true;
- /* fall through */
- case ':':
- firstsep = i;
- break loop;
- }
- }
- if (firstsep == -1) {
- LOG.error("-ROOT- location doesn't contain a separator"
- + " (':' or ','): " + Bytes.pretty(data));
- retryGetRootRegionLater(this);
- return; // TODO(tsuna): Add a watch to wait until the file changes.
+ offset = (short) (1 + 4 + metadata_length);
+ } else {
+ newstyle = false; // Maybe true, the loop below will tell us.
+ offset = 0;
+ }
+ final short n = (short) data.length;
+ // Look for the first separator. Skip the offset, and skip the
+ // first byte, because we know the separate can only come after
+ // at least one byte.
+ loop: for (short i = (short) (offset + 1); i < n; i++) {
+ switch (data) {
+ case ',':
+ newstyle = true;
+ /* fall through */
+ case ':':
+ firstsep = i;
+ break loop;
}
- final String host;
- final short portend; // Index past where the port number ends.
- if (newstyle) {
- host = new String(data, offset, firstsep - offset);
- short i;
- for (i = (short) (firstsep + 2); i < n; i++) {
- if (data == ',') {
- break;
- }
+ }
+ if (firstsep == -1) {
+ LOG.error("-ROOT- location doesn't contain a separator"
+ + " (':' or ','): " + Bytes.pretty(data));
+ retryGetRootRegionLater(this);
+ return; // TODO(tsuna): Add a watch to wait until the file changes.
+ }
+ final String host;
+ final short portend; // Index past where the port number ends.
+ if (newstyle) {
+ host = new String(data, offset, firstsep - offset);
+ short i;
+ for (i = (short) (firstsep + 2); i < n; i++) {
+ if (data == ',') {
+ break;
}
- portend = i; // Port ends on the comma.
- } else {
- host = new String(data, 0, firstsep);
- portend = n; // Port ends at the end of the array.
- }
- final int port = parsePortNumber(new String(data, firstsep + 1,
- portend - firstsep - 1));
- final String ip = getIP(host);
- if (ip == null) {
- LOG.error("Couldn't resolve the IP of the -ROOT- region from "
- + host + " in \"" + Bytes.pretty(data) + '"');
- retryGetRootRegionLater(this);
- return; // TODO(tsuna): Add a watch to wait until the file changes.
}
- LOG.info("Connecting to -ROOT- region @ " + ip + ':' + port);
- final RegionClient client = rootregion = newClient(ip, port);
- final ArrayList<Deferred<Object>> ds = atomicGetAndRemoveWaiters();
- if (ds != null) {
- for (final Deferred<Object> d : ds) {
- d.callback(client);
- }
+ portend = i; // Port ends on the comma.
+ } else {
+ host = new String(data, 0, firstsep);
+ portend = n; // Port ends at the end of the array.
+ }
+ final int port = parsePortNumber(new String(data, firstsep + 1,
+ portend - firstsep - 1));
+ final String ip = getIP(host);
+ if (ip == null) {
+ LOG.error("Couldn't resolve the IP of the -ROOT- region from "
+ + host + " in \"" + Bytes.pretty(data) + '"');
+ retryGetRootRegionLater(this);
+ return; // TODO(tsuna): Add a watch to wait until the file changes.
+ }
+ LOG.info("Connecting to -ROOT- region @ " + ip + ':' + port);
+ final RegionClient client = rootregion = newClient(ip, port);
+ final ArrayList<Deferred<Object>> ds = atomicGetAndRemoveWaiters();
+ if (ds != null) {
+ for (final Deferred<Object> d : ds) {
+ d.callback(client);
}
- disconnectZK();
- // By the time we're done, we may need to find -ROOT- again. So
- // check to see if there are people waiting to find it again, and if
- // there are, re-open a new session with ZK.
- // TODO(tsuna): This typically happens when the address of -ROOT- in
- // ZK is stale. In this case, we should setup a watch to get
- // notified once the znode gets updated, instead of continuously
- // polling ZK and creating new sessions.
- synchronized (ZKClient.this) {
- if (deferred_rootregion != null) {
- connectZK();
- }
+ }
+ disconnectZK();
+ // By the time we're done, we may need to find -ROOT- again. So
+ // check to see if there are people waiting to find it again, and if
+ // there are, re-open a new session with ZK.
+ // TODO(tsuna): This typically happens when the address of -ROOT- in
+ // ZK is stale. In this case, we should setup a watch to get
+ // notified once the znode gets updated, instead of continuously
+ // polling ZK and creating new sessions.
+ synchronized (ZKClient.this) {
+ if (deferred_rootregion != null) {
+ connectZK();
}
}
- };
+ }
+ }
|