问题导读:
1.如何配置Configuration? 2.如何获得DataNode相关信息? 3.如何获得Active NameNode?
软件版本:hadoop2.2,JDK1.7,Intellij idea14
0. 配置Configuration
如果需要使用Java程序来获得某个集群的NameNode或DataNode的相关信息,那么Configuration需要如下配置:
[mw_shl_code=java,true]
/**
* 获得配置的Configuration
*
* @return
*/
public static Configuration getConf() {
if (conf == null) {
conf = new Configuration();
conf.setBoolean("mapreduce.app-submission.cross-platform", true);// 配置使用跨平台提交任务
conf.set("fs.defaultFS", "hdfs://" + namenode + ":8020");// 指定namenode
conf.set("mapreduce.framework.name", "yarn"); // 指定使用yarn框架
conf.set("yarn.resourcemanager.address", resourcenode + ":8032"); // 指定resourcemanager
conf.set("yarn.resourcemanager.scheduler.address", schedulernode + ":8030");// 指定资源分配器
conf.set("mapreduce.jobhistory.address", jobhistorynode + ":10020");// 指定historyserver
}
return conf;
} [/mw_shl_code]
1. 针对Hadoop集群(不管有没有配置HA),获得DataNode相关信息;
[mw_shl_code=java,true]
/**
* 获取子节点
* @return
*/
public static DatanodeInfo getDATANODE() {
if(DATANODE==null) {
try {
DatanodeInfo[] live = ((DistributedFileSystem) getFS()).getDataNodeStats(HdfsConstants.DatanodeReportType.LIVE);
if(live.length<1){
log.error("获取live子节点失败!");
}
DATANODE=live[0];
}catch (IOException e){
log.warn("获取live子节点异常!\n"+e.getMessage());
}
}
return DATANODE;
}[/mw_shl_code]
2. 针对 配置了HA的Hadoop集群,获得Active NameNode的代码:
[mw_shl_code=java,true]package org.apache.hadoop.hdfs.tools;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.ha.HAServiceStatus;
import org.apache.hadoop.ha.HAServiceTarget;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
/**
* 动态获得Active NameNode
* 针对HA集群
* Created by fansy on 2016/3/29.
*/
public class DFSHAAdminUtils extends DFSHAAdmin {
private Logger log = LoggerFactory.getLogger(DFSHAAdminUtils.class);
private int rpcTimeoutForChecks = -1;
private String nnidStr;
private static String activeNameNodeHostAndPort=null;
public DFSHAAdminUtils(){}
public DFSHAAdminUtils(Configuration conf,String nnidStr){
super.setConf(conf);
if (conf != null) {
rpcTimeoutForChecks = conf.getInt(
CommonConfigurationKeys.HA_FC_CLI_CHECK_TIMEOUT_KEY,
CommonConfigurationKeys.HA_FC_CLI_CHECK_TIMEOUT_DEFAULT);
}
this.nnidStr= nnidStr;
}
public String getActiveNameNode(){
String[] nnids = nnidStr.split(",",-1);
HAServiceProtocol proto = null;
HAServiceTarget haServiceTarget=null;
HAServiceStatus state= null;
try {
for (String nnid : nnids){
haServiceTarget = resolveTarget(nnid);
proto = haServiceTarget.getProxy(
getConf(), rpcTimeoutForChecks);
state = proto.getServiceStatus();
if (state.getState().ordinal()==HAServiceProtocol.HAServiceState.ACTIVE.ordinal()) {
log.info("Active NameNode:{}",
haServiceTarget.getAddress().getHostName() + ":" + haServiceTarget.getAddress().getPort());
return haServiceTarget.getAddress().getHostName()+":"+haServiceTarget.getAddress().getPort();
}
}
}catch(IOException e){
log.error("获取Active NameNode异常!");
}
return null;
}
}[/mw_shl_code]
其中,DFSHAAdmin是Hadoop自带的,同时,这里的实现也是参考了其内部的实现;
|