分享

使用DistributedCache缓存jar完全没有?

Joker 发表于 2015-1-7 14:49:23 [显示全部楼层] 只看大图 回帖奖励 阅读模式 关闭右栏 22 59799
我想使用MR连接mysql读取数据,但是不想每个节点都添加lib包,我就使用缓存,但是更本就没有缓存下来?
代码如下

  1. public static class Map extends Mapper<LongWritable, StudentRecord, LongWritable, Text>{
  2.                 @Override
  3.                 protected void map(LongWritable key, StudentRecord value,Context context)
  4.                                 throws IOException, InterruptedException {
  5.                         context.write(new LongWritable(value.id),
  6.                     new Text(value.toString()));
  7.                 }
  8.                
  9.         }
  10.        
  11.         public static class StudentRecord implements Writable,DBWritable{
  12.                 public int id;
  13.                 public String name;
  14.                 public String sex;
  15.                 public int age;
  16.                
  17.                
  18.                 public void readFields(ResultSet result) throws SQLException {
  19.                            this.id = result.getInt(1);
  20.                    this.name = result.getString(2);
  21.                    this.sex = result.getString(3);
  22.                    this.age = result.getInt(4);
  23.                        
  24.                 }
  25.                 public void write(PreparedStatement stmt) throws SQLException {
  26.                        
  27.                        stmt.setInt(1, this.id);
  28.                    stmt.setString(2, this.name);
  29.                    stmt.setString(3, this.sex);
  30.                    stmt.setInt(4, this.age);
  31.                 }
  32.                
  33.                 public void readFields(DataInput in) throws IOException {
  34.                        
  35.                          this.id = in.readInt();
  36.                  this.name = Text.readString(in);
  37.                  this.sex = Text.readString(in);
  38.                  this.age = in.readInt();
  39.                        
  40.                 }
  41.                 public void write(DataOutput out) throws IOException {
  42.                         out.writeInt(this.id);
  43.                 Text.writeString(out, this.name);
  44.                 Text.writeString(out, this.sex);
  45.                 out.writeInt(this.age);
  46.                 }
  47.                 @Override
  48.                 public String toString() {
  49.                          return new String("学号:" + this.id + "_姓名:" + this.name
  50.                          + "_性别:"+ this.sex + "_年龄:" + this.age);
  51.                 }
  52.                
  53.                
  54.                
  55.         }
  56.        
  57.        
  58.         public static void main(String[] args) throws Exception {
  59.                
  60.                 JobConf conf = new JobConf();
  61.                 // 这句话很关键
  62.                 conf.set("mapred.job.tracker", "10.0.1.201:9001");
  63.                
  64.                 Job job = Job.getInstance(conf, "aaa");
  65.                
  66.                 job.setJarByClass(Fuck.class);
  67.                
  68.                 // 非常重要,值得关注
  69.                 DistributedCache.addFileToClassPath(new Path("hdfs://xx.xx.xx.x:9000/lib/mysql-connector-java-5.1.28.jar"), job.getConfiguration());
  70.        
  71.                
  72.                 // 设置输入类型
  73.                 //job.setInputFormatClass(DBInputFormat.class);
  74.                
  75.                 // 设置输出类型
  76.                 job.setOutputKeyClass(LongWritable.class);
  77.                 job.setOutputValueClass(Text.class);
  78.                
  79.         // 设置Map和Reduce类
  80.                 job.setMapperClass(Map.class);
  81.                
  82.                 FileInputFormat.setInputPaths(job, new Path(args[0]));
  83.                
  84.         // 设置输出目录
  85.         FileOutputFormat.setOutputPath(job, new Path(args[1]));
  86.                
  87.         // 建立数据库连接
  88.         
  89.         DBConfiguration.configureDB(job.getConfiguration(), "com.mysql.jdbc.Driver",
  90.                         "jdbc:mysql://localhost:3306/school", "root", "root");
  91.         
  92.         // 读取"student"表中的数据
  93.         String[] fields = { "id", "name", "sex", "age" };
  94.         
  95.         DBInputFormat.setInput(job, StudentRecord.class, "student", null, "id", fields);
  96.         
  97.         System.out.println(job.waitForCompletion(true)?1:0);
  98.         }
  99.        
复制代码
异常是,找不到com.mysql.jdbc.Driver


已有(22)人评论

跳转到指定楼层
Joker 发表于 2015-1-7 14:52:01
  1. log4j:WARN No appenders could be found for logger (org.apache.hadoop.conf.Configuration.deprecation).
  2. log4j:WARN Please initialize the log4j system properly.
  3. log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
  4. Exception in thread "main" java.lang.RuntimeException: java.lang.RuntimeException: java.lang.ClassNotFoundException: com.mysql.jdbc.Driver
  5.         at org.apache.hadoop.mapreduce.lib.db.DBInputFormat.setConf(DBInputFormat.java:168)
  6.         at org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:73)
  7.         at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:133)
  8.         at org.apache.hadoop.mapreduce.JobSubmitter.writeNewSplits(JobSubmitter.java:488)
  9.         at org.apache.hadoop.mapreduce.JobSubmitter.writeSplits(JobSubmitter.java:508)
  10.         at org.apache.hadoop.mapreduce.JobSubmitter.submitJobInternal(JobSubmitter.java:392)
  11.         at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1268)
  12.         at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1265)
  13.         at java.security.AccessController.doPrivileged(Native Method)
  14.         at javax.security.auth.Subject.doAs(Unknown Source)
  15.         at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491)
  16.         at org.apache.hadoop.mapreduce.Job.submit(Job.java:1265)
  17.         at org.apache.hadoop.mapreduce.Job.waitForCompletion(Job.java:1286)
  18.         at cn.base.jdbc.Fuck.main(Fuck.java:144)
  19. Caused by: java.lang.RuntimeException: java.lang.ClassNotFoundException: com.mysql.jdbc.Driver
  20.         at org.apache.hadoop.mapreduce.lib.db.DBInputFormat.getConnection(DBInputFormat.java:194)
  21.         at org.apache.hadoop.mapreduce.lib.db.DBInputFormat.setConf(DBInputFormat.java:162)
  22.         ... 13 more
  23. Caused by: java.lang.ClassNotFoundException: com.mysql.jdbc.Driver
  24.         at java.net.URLClassLoader$1.run(Unknown Source)
  25.         at java.net.URLClassLoader$1.run(Unknown Source)
  26.         at java.security.AccessController.doPrivileged(Native Method)
  27.         at java.net.URLClassLoader.findClass(Unknown Source)
  28.         at java.lang.ClassLoader.loadClass(Unknown Source)
  29.         at sun.misc.Launcher$AppClassLoader.loadClass(Unknown Source)
  30.         at java.lang.ClassLoader.loadClass(Unknown Source)
  31.         at java.lang.Class.forName0(Native Method)
  32.         at java.lang.Class.forName(Unknown Source)
  33.         at org.apache.hadoop.mapreduce.lib.db.DBConfiguration.getConnection(DBConfiguration.java:148)
  34.         at org.apache.hadoop.mapreduce.lib.db.DBInputFormat.getConnection(DBInputFormat.java:188)
  35.         ... 14 more
复制代码
回复

使用道具 举报

muyannian 发表于 2015-1-7 17:08:11
不明白楼主是什么意思?
楼主是想缓存环境??
一些jar包,让DistributedCache来完成。这个jar包是什么jar包?
如果说是运行环境所必须的,还是建议手动完成。有专门的集群管理工具,或则用shell分发。
回复

使用道具 举报

Joker 发表于 2015-1-7 17:10:18
本帖最后由 Joker 于 2015-1-7 17:15 编辑
muyannian 发表于 2015-1-7 17:08
不明白楼主是什么意思?
楼主是想缓存环境??
一些jar包,让DistributedCache来完成。这个jar包是什么j ...

我想做的是缓存Jar,然后再提交Job,job执行之前DistributedCache会进行分发这个Jar加入到ClassPath中,然后可以运行,不需要在每个节点上进行手动的分发
我缓存的Jar包是mysql-connector-java-5.1.28.jar
一个数据库的JAR
回复

使用道具 举报

Joker 发表于 2015-1-7 17:10:52
muyannian 发表于 2015-1-7 17:08
不明白楼主是什么意思?
楼主是想缓存环境??
一些jar包,让DistributedCache来完成。这个jar包是什么j ...

需求就是要使用DistributedCache来进行缓存
回复

使用道具 举报

muyannian 发表于 2015-1-7 18:32:28
Joker 发表于 2015-1-7 17:10
需求就是要使用DistributedCache来进行缓存


感觉楼主的程序有些问题,而且缓存还需要修改配置
package com.netqin.examples;

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.net.URI;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class CacheDemo {

    public static void UseDistributedCacheBySymbolicLink() throws Exception {
        FileReader reader = new FileReader("hdfs://mail.py");
        BufferedReader br = new BufferedReader(reader);
        String s = null;
        while ((s = br.readLine()) != null) {
            System.out.println(s);
        }
        br.close();
        reader.close();
    }

    public static class TokenizerMapper extends
            Mapper<Object, Text, Text, IntWritable> {

        private final static IntWritable one = new IntWritable(1);
        private Text word = new Text();

        protected void setup(Context context) throws IOException,
                InterruptedException {
            System.out.println("Now, use the distributed cache and syslink");
            try {
                UseDistributedCacheBySymbolicLink();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

        public void map(Object key, Text value, Context context)
                throws IOException, InterruptedException {
            StringTokenizer itr = new StringTokenizer(value.toString());
            while (itr.hasMoreTokens()) {
                word.set(itr.nextToken());
                context.write(word, one);
            }
        }
    }

    public static class IntSumReducer extends
            Reducer<Text, IntWritable, Text, IntWritable> {
        private IntWritable result = new IntWritable();

        public void reduce(Text key, Iterable<IntWritable> values,
                Context context) throws IOException, InterruptedException {
            int sum = 0;
            for (IntWritable val : values) {
                sum += val.get();
            }
            result.set(sum);
            context.write(key, result);
        }
    }

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        String[] otherArgs = new GenericOptionsParser(conf, args)
                .getRemainingArgs();
        if (otherArgs.length != 2) {
            System.err.println("Usage: wordcount <in> <out>");
            System.exit(2);
        }

        DistributedCache.createSymlink(conf);
        String path = "/tmp/test/mail.py";
        Path filePath = new Path(path);
        String uriWithLink = filePath.toUri().toString() + "#" + "mail.py";//记得加上井号
        DistributedCache.addCacheFile(new URI(uriWithLink), conf);
      
        // Path p = new Path("/tmp/hadoop-0.20.2-capacity-scheduler.jar#hadoop-0.20.2-capacity-scheduler.jar");
        // DistributedCache.addArchiveToClassPath(p, conf);
      
      
        Job job = new Job(conf, "CacheDemo");
        job.setJarByClass(CacheDemo.class);
        job.setMapperClass(TokenizerMapper.class);
        job.setCombinerClass(IntSumReducer.class);
        job.setReducerClass(IntSumReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}



  1. <property>
  2.   <name>mapred.local.dir</name>
  3.   <value>${hadoop.tmp.dir}/mapred/local</value>
  4.   <description>The local directory where MapReduce stores intermediate
  5.   data files.  May be a comma-separated list of
  6.   directories on different devices in order to spread disk i/o.
  7.   Directories that do not exist are ignored.
  8.   </description>
  9. </property>
  10. <property>
  11.   <name>local.cache.size</name>
  12.   <value>10737418240</value> (默认大小:10GB)
  13.   <description>The limit on the size of cache you want to keep, set by default
  14.   to 10GB. This will act as a soft limit on the cache directory for out of band data.
  15.   </description>
  16. </property>
复制代码




注意符号链接
每个存储在HDFS中的文件被放到缓存中后都可以通过一个符号链接使用。
URI hdfs://namenode/test/input/file1#myfile 你可以在程序中直接使用myfile来访问 file1这个文件。 myfile是一个符号链接文件。










回复

使用道具 举报

howtodown 发表于 2015-1-7 18:48:06
本帖最后由 howtodown 于 2015-1-7 18:51 编辑
Joker 发表于 2015-1-7 17:10
需求就是要使用DistributedCache来进行缓存

你的代码顺序不对,把


// 非常重要,值得关注
                DistributedCache.addFileToClassPath(new Path("hdfs://xx.xx.xx.x:9000/lib/mysql-connector-java-5.1.28.jar"), job.getConfiguration());

放到
  1.    JobConf conf = new JobConf();
复制代码

前面。
更多内容,可以参考@muyannian的程序


回复

使用道具 举报

Joker 发表于 2015-1-7 20:43:06
howtodown 发表于 2015-1-7 18:48
本帖最后由 howtodown 于 2015-1-7 18:51 编辑

你的代码顺序不对,把

博主你好,我放在Job初始化前面,可以还是找不到类

  1.    
  2.         Configuration conf = new Configuration();
  3.        
  4.         Path jarPath = new Path("/lib/mysql-connector-java-5.1.13-bin.jar");
  5.         DistributedCache.addArchiveToClassPath(jarPath, conf);
  6.        
  7.         // 这句话很关键
  8.         conf.set("mapred.job.tracker", "10.0.1.201:9001");
  9.         // 非常重要,值得关注
  10.       
  11.         Job job = new Job(conf,"xxoo");
  12.         
  13.       
  14.         
  15.         job.setJarByClass(Test.class);
  16.         
  17.       
  18.         
  19.         // 设置输入类型
  20.         //job.setInputFormatClass(DBInputFormat.class);
  21.         
  22.         // 设置输出类型
  23.         job.setOutputKeyClass(LongWritable.class);
  24.         job.setOutputValueClass(Text.class);
  25.         
  26. // 设置Map和Reduce类
  27.         job.setMapperClass(Map.class);
  28.         
  29. //        FileInputFormat.setInputPaths(job, new Path(args[0]));
  30.         
  31. // 设置输出目录
  32. FileOutputFormat.setOutputPath(job, new Path("db_out"));
  33.         
  34. // 建立数据库连接
  35. DBConfiguration.configureDB(job.getConfiguration(), "com.mysql.jdbc.Driver",
  36.                 "jdbc:mysql://10.0.1.201:3306/school", "root", "root");
  37. // 读取"student"表中的数据
  38. String[] fields = { "id", "name", "sex", "age" };
  39. DBInputFormat.setInput(job, StudentRecord.class, "student", null, "id", fields);
  40. System.out.println(job.waitForCompletion(true)?0:1);
复制代码



回复

使用道具 举报

Joker 发表于 2015-1-7 20:44:08
回复

使用道具 举报

langke93 发表于 2015-1-7 22:07:55
Joker 发表于 2015-1-7 20:44
哥们,请问下你这个cache是修改哪个配置文件的
我用你上面代码,还是不行。还是找不到类

在你原先程序的基础上,把
// 非常重要,值得关注
                DistributedCache.addFileToClassPath(new Path("hdfs://xx.xx.xx.x:9000/lib/mysql-connector-java-5.1.28.jar"), job.getConfiguration());



改用这个函数

DistributedCache.addArchiveToClassPath(p, conf);
并且注意自己的conf对象
是job.getconf,还是conf
回复

使用道具 举报

123下一页
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条