我的目的是想先从文件中读取数据经mapreduce处理后再将结果数据导入到数据库中。但是提示好像数据类型出错。
代码如下:[mw_shl_code=java,true]package com.sun.mysql;
import java.io.BufferedReader;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Iterator;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
/** * 将mapreduce的结果数据写入mysql中 */
public class mysqlmapre {
/** * 重写DBWritable
* TblsWritable需要向mysql中写入数据
*/
public static class TblsWritable implements Writable, DBWritable
{
String word;
int count;
public TblsWritable()
{
}
public TblsWritable(String word,int count)
{
this.word = word;
this.count = count;
}
@Override
public void write(PreparedStatement statement) throws SQLException
{
statement.setString(1, this.word);
statement.setInt(2, this.count);
}
@Override
public void readFields(ResultSet resultSet) throws SQLException
{
this.word = resultSet.getString(1);
this.count = resultSet.getInt(2);
}
@Override
public void write(DataOutput out) throws IOException
{
out.writeUTF(this.word);
out.writeInt(this.count);
}
@Override
public void readFields(DataInput in) throws IOException
{
this.word = in.readUTF();
this.count = in.readInt();
}
public String toString()
{
return new String(this.word + " " + this.count);
}
}
public static class ConnMysqlMapper extends Mapper<Object,Text,Text, IntWritable>{
//TblsRecord是自定义的类型,也就是上面重写的DBWritable类
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key,Text value,Context context)throws IOException,InterruptedException
{
//<首字母偏移量,该行内容>接收进来,然后处理value,将abc和x作为map的输出
//key对于本程序没有太大的意义,没有使用
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
String token = itr.nextToken();
for(int i=0;i<input.length;i++)
if((token.compareToIgnoreCase(input))==0){
word.set(token);
context.write(word, one);
}
}
}
}
public static class ConnMysqlReducer extends Reducer<LongWritable,Text,TblsWritable,TblsWritable>{
public void reduce(LongWritable key,Iterable<IntWritable> values,Context context)throws IOException,InterruptedException{
//接收到的key value对即为要输入数据库的字段,所以在reduce中:
//wirte的第一个参数,类型是自定义类型TblsWritable,利用key和value将其组合成TblsWritable,
// 然后等待写入数据库
//wirte的第二个参数,wirte的第一个参数已经涵盖了要输出的类型,所以第二个类型没有用,设为null
for(Iterator<IntWritable> itr = values.iterator();itr.hasNext();){
context.write(new TblsWritable(key.toString(),itr.next().get()),null);
}
}
}
public static String input[] = null;
public static void readTxtFile(String filePath){
try {
File file=new File(filePath);
if(file.isFile() && file.exists()){ //判断文件是否存在
InputStreamReader read = new InputStreamReader(
new FileInputStream(file));
BufferedReader bufferedReader = new BufferedReader(read);
String lineTxt = null;
while((lineTxt = bufferedReader.readLine()) != null){
String a = lineTxt;
input = a.split(" ");
}
read.close();
}else{
System.out.println("找不到指定的文件");
}
} catch (Exception e) {
System.out.println("读取文件内容出错");
e.printStackTrace();
}
}
@SuppressWarnings("deprecation")
public static void main(String args[]) throws IOException, InterruptedException, ClassNotFoundException
{
Configuration conf = new Configuration();
DistributedCache.addFileToClassPath(new Path("/tmp/mysql-connector-java-5.1.40-bin.jar"), conf);
String filePath = "/usr/local/hadoop/ab.txt";
readTxtFile(filePath);
args = new String[]{"hdfs://master:9000/user/hadoop/input/a.txt"};
DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver","jdbc:mysql://192.168.1.121:3306/mapreducetest","root", "123456");
Job job = Job.getInstance(conf, "test mysql connection");
job.setJarByClass(mysqlmapre.class);
job.setMapperClass(ConnMysqlMapper.class);
job.setReducerClass(ConnMysqlReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(DBOutputFormat.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
DBOutputFormat.setOutput(job, "t1", "word","count");
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}[/mw_shl_code]
运行后提示的错误是:
java.lang.Exception: java.lang.ClassCastException: org.apache.hadoop.io.Text cannot be cast to org.apache.hadoop.mapreduce.lib.db.DBWritable
at org.apache.hadoop.mapred.LocalJobRunner$Job.runTasks(LocalJobRunner.java:462)
at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:529)
Caused by: java.lang.ClassCastException: org.apache.hadoop.io.Text cannot be cast to org.apache.hadoop.mapreduce.lib.db.DBWritable
at org.apache.hadoop.mapreduce.lib.db.DBOutputFormat$DBRecordWriter.write(DBOutputFormat.java:66)
at org.apache.hadoop.mapred.ReduceTask$NewTrackingRecordWriter.write(ReduceTask.java:558)
at org.apache.hadoop.mapreduce.task.TaskInputOutputContextImpl.write(TaskInputOutputContextImpl.java:89)
at org.apache.hadoop.mapreduce.lib.reduce.WrappedReducer$Context.write(WrappedReducer.java:105)
at org.apache.hadoop.mapreduce.Reducer.reduce(Reducer.java:150)
at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:171)
at org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:627)
at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:389)
at org.apache.hadoop.mapred.LocalJobRunner$Job$ReduceTaskRunnable.run(LocalJobRunner.java:319)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
2017-03-24 15:00:47,361 INFO [org.apache.hadoop.mapreduce.Job] - Job job_local1380292941_0001 failed with state FAILED due to: NA
2017-03-24 15:00:47,424 INFO [org.apache.hadoop.mapreduce.Job] - Counters: 35
求大神帮忙看下这个问题出在哪儿,我试了好多次都没用……多谢了!
|
|