hadoop新手,请大神帮忙!
环境:win7 x64 + eclipse + VMWare + hadoop1.2.1 + jdk1.6
程序结构:
工程结构
问题描述:有2个class,KPIIP和KPIPV,分别有各自map/reduce方法,首次执行任何一个(如KPIPV),均无异常,但执行(KPIPV)一次之后再执行另一个(KPIIP)时,出现异常,异常信息如下:
九月 09, 2015 2:38:45 下午 org.apache.hadoop.mapred.MapTask$MapOutputBuffer sortAndSpill
信息: Finished spill 0
九月 09, 2015 2:38:45 下午 org.apache.hadoop.mapred.Task done
信息: Task:attempt_local571336673_0001_m_000000_0 is done. And is in the process of commiting
九月 09, 2015 2:38:45 下午 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息:
九月 09, 2015 2:38:45 下午 org.apache.hadoop.mapred.Task sendDone
信息: Task 'attempt_local571336673_0001_m_000000_0' done.
九月 09, 2015 2:38:45 下午 org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable run
信息: Finishing task: attempt_local571336673_0001_m_000000_0
九月 09, 2015 2:38:45 下午 org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable run
信息: Starting task: attempt_local571336673_0001_m_000001_0
九月 09, 2015 2:38:45 下午 org.apache.hadoop.mapred.Task initialize
信息: Using ResourceCalculatorPlugin : null
九月 09, 2015 2:38:45 下午 org.apache.hadoop.mapred.MapTask runNewMapper
信息: Processing split: hdfs://192.168.8.8:9000/hdfs/log_kpi/pv:0+0
九月 09, 2015 2:38:45 下午 org.apache.hadoop.mapred.MapTask$MapOutputBuffer <init>
信息: io.sort.mb = 100
九月 09, 2015 2:38:45 下午 org.apache.hadoop.mapred.MapTask$MapOutputBuffer <init>
信息: data buffer = 79691776/99614720
九月 09, 2015 2:38:45 下午 org.apache.hadoop.mapred.MapTask$MapOutputBuffer <init>
信息: record buffer = 262144/327680
九月 09, 2015 2:38:45 下午 org.apache.hadoop.mapred.MapTask$MapOutputBuffer flush
信息: Starting flush of map output
九月 09, 2015 2:38:45 下午 org.apache.hadoop.mapred.LocalJobRunner$Job run
信息: Map task executor complete.
九月 09, 2015 2:38:45 下午 org.apache.hadoop.mapred.LocalJobRunner$Job run
警告: job_local571336673_0001
java.lang.Exception: java.io.FileNotFoundException: File does not exist: /hdfs/log_kpi/pv
at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:354)
Caused by: java.io.FileNotFoundException: File does not exist: /hdfs/log_kpi/pv
at org.apache.hadoop.hdfs.DFSClient$DFSInputStream.fetchLocatedBlocks(DFSClient.java:2006)
at org.apache.hadoop.hdfs.DFSClient$DFSInputStream.openInfo(DFSClient.java:1975)
at org.apache.hadoop.hdfs.DFSClient$DFSInputStream.<init>(DFSClient.java:1967)
at org.apache.hadoop.hdfs.DFSClient.open(DFSClient.java:735)
at org.apache.hadoop.hdfs.DistributedFileSystem.open(DistributedFileSystem.java:165)
at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:436)
at org.apache.hadoop.mapreduce.lib.input.LineRecordReader.initialize(LineRecordReader.java:75)
at org.apache.hadoop.mapred.MapTask$NewTrackingRecordReader.initialize(MapTask.java:521)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:763)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:364)
at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:223)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
九月 09, 2015 2:38:46 下午 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
信息: map 50% reduce 0%
九月 09, 2015 2:38:46 下午 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
信息: Job complete: job_local571336673_0001
九月 09, 2015 2:38:46 下午 org.apache.hadoop.mapred.Counters log
信息: Counters: 13
九月 09, 2015 2:38:46 下午 org.apache.hadoop.mapred.Counters log
信息: FileSystemCounters
九月 09, 2015 2:38:46 下午 org.apache.hadoop.mapred.Counters log
信息: FILE_BYTES_READ=276
九月 09, 2015 2:38:46 下午 org.apache.hadoop.mapred.Counters log
信息: HDFS_BYTES_READ=3025757
九月 09, 2015 2:38:46 下午 org.apache.hadoop.mapred.Counters log
信息: FILE_BYTES_WRITTEN=401697
九月 09, 2015 2:38:46 下午 org.apache.hadoop.mapred.Counters log
信息: File Input Format Counters
九月 09, 2015 2:38:46 下午 org.apache.hadoop.mapred.Counters log
信息: Bytes Read=3025757
九月 09, 2015 2:38:46 下午 org.apache.hadoop.mapred.Counters log
信息: Map-Reduce Framework
九月 09, 2015 2:38:46 下午 org.apache.hadoop.mapred.Counters log
信息: Map output materialized bytes=332060
九月 09, 2015 2:38:46 下午 org.apache.hadoop.mapred.Counters log
信息: Combine output records=0
九月 09, 2015 2:38:46 下午 org.apache.hadoop.mapred.Counters log
信息: Map input records=14619
九月 09, 2015 2:38:46 下午 org.apache.hadoop.mapred.Counters log
信息: Spilled Records=13535
九月 09, 2015 2:38:46 下午 org.apache.hadoop.mapred.Counters log
信息: Map output bytes=304984
九月 09, 2015 2:38:46 下午 org.apache.hadoop.mapred.Counters log
信息: Total committed heap usage (bytes)=223870976
九月 09, 2015 2:38:46 下午 org.apache.hadoop.mapred.Counters log
信息: Combine input records=0
九月 09, 2015 2:38:46 下午 org.apache.hadoop.mapred.Counters log
信息: Map output records=13535
九月 09, 2015 2:38:46 下午 org.apache.hadoop.mapred.Counters log
信息: SPLIT_RAW_BYTES=115
程序:[mw_shl_code=java,true]
package org.pnlorf.hadoop.mr.kpi;
import java.io.IOException;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
public class KPIIP {
public static class KPIIPMapper extends
Mapper<LongWritable, Text, Text, LongWritable> {
@Override
protected void map(LongWritable key, Text value,
Mapper<LongWritable, Text, Text, LongWritable>.Context context)
throws IOException, InterruptedException {
KPI kpi = KPI.parser(value.toString());
if (kpi.isValid()) {
context.write(new Text(kpi.getRemote_addr()), new LongWritable(
1));
}
}
}
public static class KPIIPReducer extends
Reducer<Text, LongWritable, Text, LongWritable> {
@Override
protected void reduce(Text k2, Iterable<LongWritable> v2s,
Reducer<Text, LongWritable, Text, LongWritable>.Context context)
throws IOException, InterruptedException {
int sum = 0;
for (LongWritable v2 : v2s) {
sum += v2.get();
}
context.write(new Text(k2), new LongWritable(sum));
}
}
public static void main(String[] args) throws Exception {
String input = "hdfs://192.168.8.8:9000/hdfs/log_kpi/";
String output = "hdfs://192.168.8.8:9000/hdfs/log_kpi/ip";
Configuration conf = new Configuration();
FileSystem fileSystem = FileSystem.get(URI.create(input), conf);
Path outPath = new Path(output);
if (fileSystem.exists(outPath)) {
fileSystem.delete(outPath, true);
}
Job job = new Job(conf, KPIIP.class.getSimpleName());
// 1.1 读取hdfs,读取文件位于哪里
FileInputFormat.setInputPaths(job, input);
// 指定如何对输入文件进行格式化,把输入文件每一行解析成键值对
job.setInputFormatClass(TextInputFormat.class);
// 1.2 指定自定义的map类
job.setMapperClass(KPIIPMapper.class);
// map输出的<k,v>类型。如果<k3, v3>的类型与<k2, v2>类型一直,则可以省略
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
// 1.3 分区
job.setPartitionerClass(HashPartitioner.class);
job.setNumReduceTasks(1);
// TODO:1.4 排序、分组
// TODO:1.5 规约
// 2.2 指定自定义reduce类
job.setReducerClass(KPIIPReducer.class);
// 指定reduce的输出类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
// 2.3 指定写出到哪里
FileOutputFormat.setOutputPath(job, outPath);
// 指定输出文件的格式化类
job.setOutputFormatClass(TextOutputFormat.class);
// 把job提交给JobTracker运行
job.waitForCompletion(true);
}
}
[/mw_shl_code]
[mw_shl_code=java,true]
package org.pnlorf.hadoop.mr.kpi;
import java.io.IOException;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
public class KPIPV {
public static class KPIPVMapper extends
Mapper<LongWritable, Text, Text, LongWritable> {
@Override
protected void map(LongWritable key, Text value,
Mapper<LongWritable, Text, Text, LongWritable>.Context context)
throws IOException, InterruptedException {
KPI kpi = KPI.parser(value.toString());//KPI.filterPVs(value.toString());
if (kpi.isValid()) {
context.write(new Text(kpi.getRequest()), new LongWritable(1));
}
}
}
public static class KPIPVReducer extends
Reducer<Text, LongWritable, Text, LongWritable> {
@Override
protected void reduce(Text k2, Iterable<LongWritable> v2s,
Reducer<Text, LongWritable, Text, LongWritable>.Context context)
throws IOException, InterruptedException {
int sum = 0;
for (LongWritable v2 : v2s) {
sum += v2.get();
}
context.write(k2, new LongWritable(sum));
}
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
String input = "hdfs://192.168.8.8:9000/hdfs/log_kpi/";
String output = "hdfs://192.168.8.8:9000/hdfs/log_kpi/pv";
Configuration conf = new Configuration();
FileSystem fileSystem = FileSystem.get(URI.create(input), conf);
Path outPath = new Path(output);
if (fileSystem.exists(outPath)) {
fileSystem.delete(outPath, true);
}
Job job = new Job(conf, KPIPV.class.getSimpleName());
// 1.1 指定读取的文件位于哪里
FileInputFormat.setInputPaths(job, input);
// 指定如何对输入文件进行格式化,把输入文件每一行解析成键值对
job.setInputFormatClass(TextInputFormat.class);
// 1.2 指定自定义的map类
job.setMapperClass(KPIPVMapper.class);
// map输出的<k,v>类型。如果<k3, v3>的类型与<k2, v2>类型一直,则可以省略
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
// 1.3 分区
job.setPartitionerClass(HashPartitioner.class);
// 有一个reduce任务执行
job.setNumReduceTasks(1);
// 1.4 TODO:排序、分组
// 1.5 TODO:规约
// 2.2 指定自定义reduce类
job.setReducerClass(KPIPVReducer.class);
// 指定reduce的输出类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
// 2.3 指定写出到哪里
FileOutputFormat.setOutputPath(job, outPath);
// 指定输出文件的格式化类
job.setOutputFormatClass(TextOutputFormat.class);
// 把job提交给JobTracker运行
job.waitForCompletion(true);
}
}
[/mw_shl_code]
请大神帮忙解答!!!!!
|