一、背景:
用户不断向Hadoop集群中提交Job,在集群运行过程提交的任务状态:等待、执行、执行成功、执行失败
现在使用ZooKeeper对其进行集中管理,让第一次处理失败的任务回调后再次执行.当多次执行失败,系统将
认定此任务存在错误,停止对此任务的回调并将其保存下来,同时保存下成功的任务.
二、设计方案
设定节点/root为所有被存储在节点的根节点,并设置四个存储不同状态的节点:/root/wait /root/processed /root/temp /root/error
节点名称为提交该任务时产生的任务编号(JobID),存储的值为对应的回调操作的shell指令
三、设计实现
public class InitConfReader{
private String confFileUrl:
public Map<String , String> getConfs(List<String> keys){
Map<String ,String>result=new HashMap<String, String>();
Properties properties=new Properties();
try{
//从配置文件中读取配置信息,并用读取到的信息作为服务器启动的参数
properties.load(new FileReader(new File(confFileUrl));
}
catch(FileNotFoundException e){
e.printStackTrace();
}
catch(IOException e){
e.printStackTrace();
}
for(String key:kwys){
String value=(String)properties.get(key);
result.put(key, value);
}
return result;
}
}
②初始化与维护原始数据节点
public class SchedulingServer implments Watcher{
private ZooKeeper zooKeeper;
privae String connectString; //conectString连接字符串,包括IP地址,服务器端口号
private int session Timeout;
public void initConf() thows Exception{
InitConfReader reader=new InitConfReader(“init.properties”);
List<String>keys=new ArrayList<String>();
keys.add(“connectSring”);
keys.add(“sessionTimeout”);
Map<String ,String>confs=reader.getConfs(keys);
this.connectString=conf.get(“connectString”);
this.sessionTimeout=Integer.parseInt(confs.get(“sessionTimeout”));
zooKeeper=new ZooKeeper(connectString, sessionTimeout ,this);
}
/*
1.整个系统中的所有静态节点均被创建。
2.“/root”节点被创建,“/root/client”未被创建
3.“/root/client”被创建但其下子节点一个或多个未被创建.
4.存储状态的一个或多个节点未被创建
*/
public void initServer() throws Exception{
//stat用于存储被监测节点是否存在,若不存在则对应的值为null
Stat stat=zooKeeprr.exits(“/root” , false);
if(stat==null){
//根节点
zooKeeper.create(“/root”,null,Ids.OPEN_ACL_UNSAFE,createMode.PERSISTENT);
//失败任务存储节点
zooKeeper.create(“/root/error”,null,Ids.OPEN_ACL_UNSAFE,createMode.PERSISTENT);
//成功任务存储节点
zooKeeper.create(“/root/processed”,null,Ids.OPEN_ACL_UNSAFE,createMode.PERSISTENT);
//等待和正在运行任务存储节点
zooKeeper.create(“/root/wait”,null,Ids.OPEN_ACL_UNSAFE,createMode.PERSISTENT);
//临时存储第一次处理失败的节点
zooKeeper.create(“/root/temp”,null,Ids.OPEN_ACL_UNSAFE,createMode.PERSISTENT);
}
stat=zooKeeper.exists(“root/error”, false);
if(stat==null){
zooKeeper.create(“/root/error”,null,Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISITENT);
}
stat=zooKeeper.exists(“/root/processed”,false);
if(stat==null){
zooKeeper.create(“/root/processed”,null,Ids.OPEN_ACL_UNSAFE,createMode.PERSISTENT);
}
stat=zooKeeper.exists(“root/wait”,false);
if(stat==null){
zooKeeper.create(“/root/wait”,null,Ids.OPEN_ACL_UNSAFE,createMode.PERSISTENT);
}
}
public void process(WatchedEvent event){
}
}
③检测节点中存储的任务编号所对应的Job运行状态
public class ServerMonitor implements Watcher,Runnable{
private ZooKeeper zooKeeper;
private String connectString;
private int session Timeout;
private String hadoopHome;
private String mapredJob Trachker;
//初始化文件加载,并用其内容配置ZooKeeper服务器的连接
public void initConf() throws Exeception{
InitConfReader reader=new InitConfReader(“init.properties”);
List<String> keys=new ArrayList<String>();
keys.add(“connectString”);
keys.add(“sessionTimeout”);
keys.add(“hadoopHome”);
keys.add(“mapred.job.tracher”);
Map<String , String>confs=reader.getConfs(keys);
this.hadoopHome=confs.get(“hadoopHome”);
this.mapredJobTracker=confs.get(“mapred.job.tracker”);
zooKeeper=new ZooKeeper(connectString,sessionTimeout,this);
//监视节点中存储的任务状态变化
public ServerMonitor() throws Exeeption{
SchedulingServer schedulingServer=new SchedulingServer();
schedulingServer.initConf();
schedulingServer.initServer();
public void process(WatchedEvent event){ }
/* 一个任务可能出于:等待,运行,成功,失败,杀死等状态中的一个
1.任务出于等待或运行状态,不做任何操作,继续检测任务状态,知道状态发生变化
2.任务出于成功状态,从”/root/client/wait”中删除,并将其插入到”/root/clients/processed”当中,并停止对该节点进行检测
3.程序第一次出于失败或杀死状态,将任务插入“/root/client/temp”中,并回调,如果连续两次都是失败或被杀死,则将其插入”/root/client/error”并停止对此任务的检测。
public void monitorNode() throws Exception{
List<String>waits=zooKeeper.getChildren(“/root/clien/wait” ,false);
JobConf conf=new JobConf();
conf.set(“mapred.job.tracker” , mapredJobTracker);
JobClient jobClient=new JobClient(conf);
for(String wait:waits){
String data=new String(zooKeeper.getdata(“/root/client/wait/”+wait,false,null);
JobID jobid=null; try{
jobid=JobID.forName(wait);
System.out.println(“hob id is wrong!!!”) ;
Stat stat=zooKeeper.exists(“/root/client/error/”+wait,false);
zooKeeper.delete(“/root/client/error/”+wait,-1); }
zooKeeper.delete(“/root/client/wait/”+wait,-1);
zooKeeper.create(“/root/client/error/”+wait,data,getBytes(),Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT); continus; }
//通过任务的JOBID来检测任务正在处于的状态
int runStat=jobClient.getJob(org.apache.hadoop.mapred.JobID)jobid).getJobState();
//处于等待和运行状态的任务在状态不发生改变前不做处理
case JobStatus.RUNNING:
//当任务执行成功后,删除原”/root/wait”目录下的节点并将其任务信息插入到“/root/wait/processed”
case JobStatus.SUCCEEDED:
zooKeeper.delete(“/root/client/wait/”+wait,-1);
zooKeeper.create(“/root/client/processed/”+wait,data.getBytes(),Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
List<String>tempNodes=zooKeeper.getChildren(“/root/client/temp”,false);
if(tempNodes==null || tempNodes.size()==0) { break; }
for(String tempNode:tempNodes){
if(new String(zooKeeper,getData(“/root/client/temp”+tempNode,false,null)).equals(data)){ zooKeeper.delete(“/root/client/temp/”+tempNode,-1); } } }
//当任务执行失败或者任务呗杀掉,将会把任务插入“/root/temp”下并回调任务,如果任务回调后失败,则将任务插入”root/error”
case JobSatus.FAILED:
case JobStatus.KILLED:
zooKeeper.delete(“/root/client/wait/”+wait,-1);
tempNodes=zooKeeper.getChildren(“/root/client/temp”,false);
zooKeeper.create(“/root/client/temp”+wait,data.getBytes(),Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
if(tempNodes==null || tempNode.size()==0){
shellTool.callBack(data,hadoopHome); }
for(String tempNode:tempNodes){
if(new String(zooKeeper.geData(“/root/client/temp”+temNode,false,null)).equals(data)){ zooKeeper.create(“/root/client/error/”+wait,data.getBytes(), Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
zooKeeper.delete(“/root/client/temp/”+wait, -1);
zooKeeper.delete(“/root/client/temp”+ tempNode, -1); flag=false; } }
ShellTool.callBack(data, hadoopHome); } }
public void run() {
ServerMonitor serverWaitMonitor = new ServerMonitor();
ServerWaitMonitor.monitorNode(); Thread.sleep(5000);
} } catch(Exception e){
e.printStackTrace(); } }
public static void main(String[] args) throws Exception{
Thread thread =new Thread(new ServerMonitor());
来自群组: Hadoop技术组 |
|