分享

hadoop集群算法调用--web平台2.0

问题导读
1、什么是mahout算法?
2、使用mahout算法以及hadoop的MR算法时,流程是怎样的?
2、如何去验证输入的jobtracker 和namenode机器名和端口是否正确?






前记
各种版本:Mahout版本:0.7,hadoop版本:1.0.4,jdk:1.7.0_25 64bit,struts2
项目源码:下载

项目继续沿用之前的框架struts2。当使用mahout算法以及hadoop的MR算法的时候,前台输入参数,action接收,然后启动一个线程,跳转到MR算法监控界面。如果是一般的操作,比如数据的查看以及转换那么前台输入参数后,后台直接处理,然后返回原页面,同时给出提示,不再新开线程。(项目新增了hadoop的基础操作,包括读取、写入、上传、下载和转换等等)。下面的分析,一般是按照先看效果图,然后再分析源码的方式进行。

1.配置
1.1首页
打开浏览器,输入项目发布的首页,即可看到下面的界面:
1.png


1.2 配置
配置功能其实就是让用户配置hadoop集群的,配置界面如下:
2.png


输入jobtracker 和namenode的机器名和端口号,点击提交,如果成功,就会给出红色提示信息。那么这样如何去验证的呢?
其实就是根据输入的参数得到一个jobclient对象,如果可以得到那么就说明集群是可用的,否则则说明不可用;具体代码如下:
  1. public static boolean initialJobClient(){  
  2.         if(HOST==null||JOBTRACKER_PORT==0||NAMENODE_PORT==0){  
  3.             return false;  
  4.         }  
  5.         log.info("Initial  job client begins...");  
  6.         boolean flag=true;  
  7.         try {  
  8.             InetSocketAddress jobTracker=new InetSocketAddress(HOST,JOBTRACKER_PORT);  
  9.             jobClient=new JobClient(jobTracker, getConf());  
  10.         } catch (IOException e) {  
  11.             flag=false;  
  12.             log.info("Job client can't be got\n"+e.getMessage());  
  13.         }  
  14.         if(flag){  
  15.             log.info("Initial  job client done!!!");  
  16.         }  
  17.         return flag;  
  18.     }  
复制代码



但是,如果当集群不可用的时候,就会一直连接,后台一直打印重试连接,前台一直等待,暂时还没有进行相应的处理。
2. hadoop读取删除
hadoop读取删除其实就是hadoop文件的一般操作,上传下载主要是使用FileSystem做的。
2.1 写入
可以按照下面界面输入相应的参数:
3.png

点击提交后,会有红色字体提示写入成功,或者可以在云平台进行数据查看(50070界面)。这里写入使用的是FSDataOutputStream的write功能,这个类有好几个write(不同数据类型,这里使用的是string的),不同的write方式,读取需要使用相应的方式。写入的代码如下:
  1. /**
  2.      * write string to hdfs file
  3.      * @param path
  4.      * @param data
  5.      * @return
  6.      */  
  7.     public static boolean writeToHdfs(String path,String data){  
  8.         boolean flag = true;  
  9.         Path filePath=new Path(path);   
  10.         FileSystem fs;   
  11.         FSDataOutputStream out=null;   
  12.         try {   
  13.             fs = FileSystem.get(filePath.toUri(),getConf());  
  14.             out = fs.create(filePath);   
  15.             out.writeUTF(data);   
  16.         } catch(Exception e){   
  17.             log.info("write to hdfs file"+filePath.toString()+" :"+e.getMessage());  
  18.             flag=false;  
  19.         }finally {   
  20.             try {  
  21.                 if(out!=null){  
  22.                     out.close();  
  23.                 }  
  24.             } catch (IOException e) {  
  25.                 log.info("close hdfs file "+filePath.toString()+" wrong\n"+e.getMessage());  
  26.                 flag=false;  
  27.             }   
  28.         }   
  29.         return flag;  
  30.     }  
复制代码



2.2 读取
这里读取的还是刚才写入的内容:
4.png

输入对应的目录,提交后即可看到内容。需要说明的是这里读取的数据一定要是用writeUTF的方式写入的,不然会是乱码。后面应该会修改这个,改的更加通用点。读取的代码如下:
  1. /**
  2.      * read hdfs file to String
  3.      * @param file
  4.      * @return
  5.      */  
  6.     public static String readHdfs(String path){  
  7.         String data="";  
  8.         Path filePath= new Path(path);  
  9.         FileSystem fs =null;   
  10.         FSDataInputStream in = null;   
  11.         try {   
  12.           fs=FileSystem.get(filePath.toUri(), getConf());   
  13.           in=fs.open(filePath);  
  14.           data= in.readUTF();   
  15.         }catch(Exception e){  
  16.             log.info("read hdfs file "+path+" error:\n"+e.getMessage());  
  17.         }finally {   
  18.             try {  
  19.                 if(in!=null){  
  20.                     in.close();  
  21.                 }  
  22.                 if(fs!=null){  
  23.                     fs.close();  
  24.                 }  
  25.             } catch (IOException e) {  
  26.                 log.info("close FileSystem error:\n"+e.getMessage());  
  27.             }  
  28.         }   
  29.         return data;  
  30.     }
复制代码


2.3 上传
上传其实就是使用FileSystem的copy函数而已,主要代码如下:
  1. Path in=new Path(localFile);  
  2.         Path out=new Path(hdfsFile);  
  3.         FileSystem fs=null;  
  4.         try {  
  5.             fs = FileSystem.get(URI.create(hdfsFile),getConf());  
  6.             fs.copyFromLocalFile(in,out);  
复制代码


2.4 下载
下载也是一样,使用的同样是FileSystem的函数,具体如下:
  1. Path in=new Path(hdfsFileName);  
  2. <span style="white-space:pre">      </span>Path out=new Path(localFileName);  
  3. <span style="white-space:pre">      </span>try {  
  4. <span style="white-space:pre">          </span>fs=FileSystem.get(URI.create(hdfsFileName),getConf());  
  5. <span style="white-space:pre">          </span>fs.copyToLocalFile(in, out);  
复制代码



3. hadoop 算法
3.1 序列文件读取
hadoop算法本来是想写成MR的形式的,但是写MR又要上传代码到hadoop集群,感觉麻烦,所以没有写成MR的模式,也就是说没有了监控(其实,后面还是要把代码.class文件打包上传到云平台的)。这里的序列文件读取,其实就是使用了Sequence.Reader而已。需要说明的是,这个读取只是针对一般的<key,value>格式,比如LongWritable、IntWritable等等。读取后的文件一个如下所示:
5.png


并且,如果没有序列文件的话,可以按照页面的提示,生成一个序列文件。
3.2 序列文件转换为txt
其实,就是多了一个步骤,把3.1的文件写入一个本地文件而已。写入本地文件使用的是PrintStream,具体如下:
  1. /**
  2.      * 写入字符串到本地文件
  3.      * @param localPath
  4.      * @param data
  5.      */  
  6.     public static boolean writeToLocal(String localPath,String data){  
  7.         boolean flag=true;  
  8.         File file=new File(localPath);  
  9.         if(!file.exists()){  
  10.             try {  
  11.                 new File(file.getParent()).mkdirs();  
  12.                 file.createNewFile();  
  13.             } catch (Exception e) {  
  14.                 log.info("创建文件失败\n"+e.getMessage());  
  15.             }  
  16.               
  17.         }  
  18.         PrintStream ps=null;  
  19.         try {  
  20.             ps = new PrintStream(new FileOutputStream(file));  
  21.             ps.print(data);  
  22.         } catch (FileNotFoundException e) {  
  23.             log.info("写入本地文件"+localPath+"失败\n"+e.getMessage());  
  24.             flag=false;  
  25.         }finally{  
  26.             ps.close();  
  27.         }  
  28.         return flag;  
  29.     }  
复制代码


4. mahout算法
mahout算法设计的思路其实和前面监控算法的程序是一样的,只是这里在读取云平台已有job列表的时候加了判断,如下:
  1. JobStatus[] jobStatusAll=HadoopUtil.jobClient.getAllJobs();  
  2.         JobStatus jobStatus=null;  
  3.         int id =0;  
  4.         String jobIden="";  
  5.         /**
  6.          * 防止当前云平台是第一次启动,这个时候没有任务列表,获取的jobStatus是空;
  7.          */  
  8.         if(jobStatusAll==null||jobStatusAll.length<=0){  
  9.             //修改TaskTracker代码,把集群启动时间写入hdfs,然后在这里读取出来  
  10.             id=0;  
  11.             jobIden=readJTStartTime();  
  12.               
  13.         }else{  
  14.             jobStatus=jobStatusAll[jobStatusAll.length-1];  
  15.             id=jobStatus.getJobID().getId();  
  16.             jobIden=jobStatus.getJobID().getJtIdentifier();  
  17.         }  
复制代码


这里,如果第一次启动的时候,文件列表是空的,这时就需要去读取启动时间,这里采取的方式可以参考:《hadoop 启动时间写入文件》。而这里的代码只是读取文件中的启动时间,然后去拼凑jobId而已。
监控的一般形式如下:
6.png

但是,也有可能会出现下面的界面:
7.png


看上面两个红色方框,第一个因为前一个任务刚启动然后就失败了,导致jobid没有写入到joblist中。第二个是因为页面刷新的时间和任务启动之间的间隔关系,如果job任务刚刚结束之前读取了一个job状态是running,然后隔了一段时间后,另外一个job任务启动了,那么之前的job任务的状态就会是running的状态,而没有更新。这里的逻辑还有点问题,有待更新。

目前mahout算法可以使用的是Canopy、Kmeans、协同过滤算法,模式挖掘还没有写。
另外需要注意的一点是,工程下载后需要把项目的.class文件打包上传到云平台才不会出错。因为在canopy、kmeans算法的时候,lz自己写了一个数据转换的程序,运行这两个算法的时候会首先调用数据转换,所以没有的话就会报错了。
最后说下数据格式吧:
canopy和kmeans算法的都是一样的,一般如下:
  1. 1,133,8  
  2. 5,122775,10  
  3. 9,18297,6  
  4. 9,50422,8  
  5. 9,80503,10  
  6. 9,110624,8  
  7. 9,147283,10  
  8. 9,218923,5  
  9. 18,102606,2  
  10. 25,58155,2  
复制代码


里面的逗号可以是其他字符,需要在算法参数页面设置。
协同过滤算法也是上面的数据格式,不过逗号不能改,且一定是userID,ItemID,prefValue的格式。

没找到任何评论,期待你打破沉寂

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条