pig2 发表于 2014-3-8 00:45:34

hadoop编程:hadoop与mysql数据库相连读出数据

本帖最后由 pig2 于 2014-3-8 01:32 编辑

我们在开发中有时候会用mysql,那么
1.hadoop如何与mysql数据库相连读出数据?
2.需要引入那些包?
3.JobConf的作用是什么?
4.DBConfiguration.configureDB的作用是什么?
可以参考下面代码:

运行mysql创建数据库School,建立teacher表,并自行填写值DROP TABLE IF EXISTS `school`.`teacher`;

CREATE TABLE`school`.`teacher` (

`id` int(11) default NULL,

`name` char(20) default NULL,

`age` int(11) default NULL,

`departmentID` int(11) default NULL

) ENGINE=InnoDB DEFAULT CHARSET=latin1;在eclipse中运行编译通过 但要加入必须的库 以及 hadoop0.20.2的eclipse的插件import java.io.IOException;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.lib.IdentityReducer;
import org.apache.hadoop.mapred.lib.db.DBConfiguration;
import org.apache.hadoop.mapred.lib.db.DBInputFormat;

public class DBAccess2 {



      public static void main(String[] args) throws IOException {

             JobConf conf = new JobConf(DBAccess2.class);

             conf.setOutputKeyClass(LongWritable.class);

             conf.setOutputValueClass(Text.class);



             conf.setInputFormat(DBInputFormat.class);

             FileOutputFormat.setOutputPath(conf, new Path("hdfs://localhost:9000/dbout"));

            

             DBConfiguration.configureDB(conf,"com.mysql.jdbc.Driver",

                        "jdbc:mysql://localhost:3306/school","root","zxcvbnm");



             String [] fields = {"id", "name", "age", "departmentID"};

             DBInputFormat.setInput(conf, TeacherRecord.class, "teacher",

                        null, "id", fields);

            

             conf.setMapperClass(DBAccessMapper.class);

             conf.setReducerClass(IdentityReducer.class);

            

             JobClient.runJob(conf);

      }



}注:请自行修改数据库连接语句 用户名 密码 等等。import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;

public class DBAccessMapper extends MapReduceBase implements

Mapper<LongWritable, TeacherRecord, LongWritable, Text> {



      @Override

      public void map(LongWritable key, TeacherRecord value,

                  OutputCollector<LongWritable, Text> collector, Reporter reporter)

                  throws IOException {

             // TODO Auto-generated method stub

collector.collect(new LongWritable(value.id),

new Text(value.toString()));

      }

   

}import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.lib.db.DBWritable;

public class TeacherRecord implements Writable, DBWritable{



      int id;

      String name;

      int age;

      int departmentID;

   

      @Override

      public void readFields(DataInput in) throws IOException {

             // TODO Auto-generated method stub

             this.id = in.readInt();

             this.name = Text.readString(in);

             this.age = in.readInt();

             this.departmentID = in.readInt();

      }



      @Override

      public void write(DataOutput out) throws IOException {

             // TODO Auto-generated method stub

             out.writeInt(this.id);

             Text.writeString(out, this.name);

             out.writeInt(this.age);

             out.writeInt(this.departmentID);

      }



      @Override

      public void readFields(ResultSet result) throws SQLException {

             // TODO Auto-generated method stub

             this.id = result.getInt(1);

             this.name = result.getString(2);

             this.age = result.getInt(3);

             this.departmentID = result.getInt(4);

      }



      @Override

      public void write(PreparedStatement stmt) throws SQLException {

             // TODO Auto-generated method stub

             stmt.setInt(1, this.id);

             stmt.setString(2, this.name);

             stmt.setInt(3, this.age);

             stmt.setInt(4, this.departmentID);

      }



      @Override

      public String toString() {

             // TODO Auto-generated method stub

             return new String(this.name + " " + this.age + " " + this.departmentID);

      }



}

yoki 发表于 2014-11-2 16:34:01

Exception in thread "main" java.lang.RuntimeException: java.lang.RuntimeException: java.lang.ClassNotFoundException: com.mysql.jdbc.Driver
        at org.apache.hadoop.mapreduce.lib.db.DBInputFormat.setConf(DBInputFormat.java:168)
        at org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:73)
        at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:133)
        at org.apache.hadoop.mapred.JobConf.getInputFormat(JobConf.java:687)
        at org.apache.hadoop.mapreduce.JobSubmitter.writeOldSplits(JobSubmitter.java:520)
        at org.apache.hadoop.mapreduce.JobSubmitter.writeSplits(JobSubmitter.java:512)
        at org.apache.hadoop.mapreduce.JobSubmitter.submitJobInternal(JobSubmitter.java:394)
        at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1285)
        at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1282)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1614)
        at org.apache.hadoop.mapreduce.Job.submit(Job.java:1282)
        at org.apache.hadoop.mapred.JobClient$1.run(JobClient.java:562)
        at org.apache.hadoop.mapred.JobClient$1.run(JobClient.java:557)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1614)
        at org.apache.hadoop.mapred.JobClient.submitJobInternal(JobClient.java:557)
        at org.apache.hadoop.mapred.JobClient.submitJob(JobClient.java:548)
        at org.apache.hadoop.mapred.JobClient.runJob(JobClient.java:833)
        at mysql_jdbc.main(mysql_jdbc.java:188)
Caused by: java.lang.RuntimeException: java.lang.ClassNotFoundException: com.mysql.jdbc.Driver
        at org.apache.hadoop.mapreduce.lib.db.DBInputFormat.getConnection(DBInputFormat.java:194)
        at org.apache.hadoop.mapreduce.lib.db.DBInputFormat.setConf(DBInputFormat.java:162)
        ... 21 more
Caused by: java.lang.ClassNotFoundException: com.mysql.jdbc.Driver
        at java.net.URLClassLoader$1.run(URLClassLoader.java:372)
        at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
        at java.security.AccessController.doPrivileged(Native Method)
        at java.net.URLClassLoader.findClass(URLClassLoader.java:360)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
        at java.lang.Class.forName0(Native Method)
        at java.lang.Class.forName(Class.java:260)
        at org.apache.hadoop.mapreduce.lib.db.DBConfiguration.getConnection(DBConfiguration.java:148)
        at org.apache.hadoop.mapreduce.lib.db.DBInputFormat.getConnection(DBInputFormat.java:188)
        ... 22 more
为什么会出现这种错误??
页: [1]
查看完整版本: hadoop编程:hadoop与mysql数据库相连读出数据