hadoop编程:hadoop与mysql数据库相连读出数据
本帖最后由 pig2 于 2014-3-8 01:32 编辑我们在开发中有时候会用mysql,那么
1.hadoop如何与mysql数据库相连读出数据?
2.需要引入那些包?
3.JobConf的作用是什么?
4.DBConfiguration.configureDB的作用是什么?
可以参考下面代码:
运行mysql创建数据库School,建立teacher表,并自行填写值DROP TABLE IF EXISTS `school`.`teacher`;
CREATE TABLE`school`.`teacher` (
`id` int(11) default NULL,
`name` char(20) default NULL,
`age` int(11) default NULL,
`departmentID` int(11) default NULL
) ENGINE=InnoDB DEFAULT CHARSET=latin1;在eclipse中运行编译通过 但要加入必须的库 以及 hadoop0.20.2的eclipse的插件import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.lib.IdentityReducer;
import org.apache.hadoop.mapred.lib.db.DBConfiguration;
import org.apache.hadoop.mapred.lib.db.DBInputFormat;
public class DBAccess2 {
public static void main(String[] args) throws IOException {
JobConf conf = new JobConf(DBAccess2.class);
conf.setOutputKeyClass(LongWritable.class);
conf.setOutputValueClass(Text.class);
conf.setInputFormat(DBInputFormat.class);
FileOutputFormat.setOutputPath(conf, new Path("hdfs://localhost:9000/dbout"));
DBConfiguration.configureDB(conf,"com.mysql.jdbc.Driver",
"jdbc:mysql://localhost:3306/school","root","zxcvbnm");
String [] fields = {"id", "name", "age", "departmentID"};
DBInputFormat.setInput(conf, TeacherRecord.class, "teacher",
null, "id", fields);
conf.setMapperClass(DBAccessMapper.class);
conf.setReducerClass(IdentityReducer.class);
JobClient.runJob(conf);
}
}注:请自行修改数据库连接语句 用户名 密码 等等。import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
public class DBAccessMapper extends MapReduceBase implements
Mapper<LongWritable, TeacherRecord, LongWritable, Text> {
@Override
public void map(LongWritable key, TeacherRecord value,
OutputCollector<LongWritable, Text> collector, Reporter reporter)
throws IOException {
// TODO Auto-generated method stub
collector.collect(new LongWritable(value.id),
new Text(value.toString()));
}
}import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.lib.db.DBWritable;
public class TeacherRecord implements Writable, DBWritable{
int id;
String name;
int age;
int departmentID;
@Override
public void readFields(DataInput in) throws IOException {
// TODO Auto-generated method stub
this.id = in.readInt();
this.name = Text.readString(in);
this.age = in.readInt();
this.departmentID = in.readInt();
}
@Override
public void write(DataOutput out) throws IOException {
// TODO Auto-generated method stub
out.writeInt(this.id);
Text.writeString(out, this.name);
out.writeInt(this.age);
out.writeInt(this.departmentID);
}
@Override
public void readFields(ResultSet result) throws SQLException {
// TODO Auto-generated method stub
this.id = result.getInt(1);
this.name = result.getString(2);
this.age = result.getInt(3);
this.departmentID = result.getInt(4);
}
@Override
public void write(PreparedStatement stmt) throws SQLException {
// TODO Auto-generated method stub
stmt.setInt(1, this.id);
stmt.setString(2, this.name);
stmt.setInt(3, this.age);
stmt.setInt(4, this.departmentID);
}
@Override
public String toString() {
// TODO Auto-generated method stub
return new String(this.name + " " + this.age + " " + this.departmentID);
}
}
Exception in thread "main" java.lang.RuntimeException: java.lang.RuntimeException: java.lang.ClassNotFoundException: com.mysql.jdbc.Driver
at org.apache.hadoop.mapreduce.lib.db.DBInputFormat.setConf(DBInputFormat.java:168)
at org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:73)
at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:133)
at org.apache.hadoop.mapred.JobConf.getInputFormat(JobConf.java:687)
at org.apache.hadoop.mapreduce.JobSubmitter.writeOldSplits(JobSubmitter.java:520)
at org.apache.hadoop.mapreduce.JobSubmitter.writeSplits(JobSubmitter.java:512)
at org.apache.hadoop.mapreduce.JobSubmitter.submitJobInternal(JobSubmitter.java:394)
at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1285)
at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1282)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1614)
at org.apache.hadoop.mapreduce.Job.submit(Job.java:1282)
at org.apache.hadoop.mapred.JobClient$1.run(JobClient.java:562)
at org.apache.hadoop.mapred.JobClient$1.run(JobClient.java:557)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1614)
at org.apache.hadoop.mapred.JobClient.submitJobInternal(JobClient.java:557)
at org.apache.hadoop.mapred.JobClient.submitJob(JobClient.java:548)
at org.apache.hadoop.mapred.JobClient.runJob(JobClient.java:833)
at mysql_jdbc.main(mysql_jdbc.java:188)
Caused by: java.lang.RuntimeException: java.lang.ClassNotFoundException: com.mysql.jdbc.Driver
at org.apache.hadoop.mapreduce.lib.db.DBInputFormat.getConnection(DBInputFormat.java:194)
at org.apache.hadoop.mapreduce.lib.db.DBInputFormat.setConf(DBInputFormat.java:162)
... 21 more
Caused by: java.lang.ClassNotFoundException: com.mysql.jdbc.Driver
at java.net.URLClassLoader$1.run(URLClassLoader.java:372)
at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:360)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:260)
at org.apache.hadoop.mapreduce.lib.db.DBConfiguration.getConnection(DBConfiguration.java:148)
at org.apache.hadoop.mapreduce.lib.db.DBInputFormat.getConnection(DBInputFormat.java:188)
... 22 more
为什么会出现这种错误??
页:
[1]