hadoop之MapReduce的问题,请求解答!
刚接触的hadoop,作为一个资深的小白,刚踏入hadoop,搭建成功环境以后,且能访问的页面都已经成功访问成功,肯定就是非常希望能够运行一个实例来查看结果了,可是我不知道为什么就是出现了各种问题,在解决了很多问题后,终于让MapReduce的第一个实例运行不报错了,但是却也没有得到自己想要的结果。希望能够有人能够帮助下!并没有报错,就是没有结果!
看下代码吧!
先新建了一个类来存数据
package com.wujie;
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Objects;
public class KeyPari implements WritableComparable<KeyPari> {
private int year;
private int hot;
public int getYear() {
return year;
}
public void setYear(int year) {
this.year = year;
}
public int getHot() {
return hot;
}
public void setHot(int hot) {
this.hot = hot;
}
@Override
public String toString() {
return "Patri{" +
"year=" + year +
", hot=" + hot +
'}';
}
@Override
public int compareTo(KeyPari o) {
int res = Integer.compare(year,o.getYear());
if(res != 0){
return res ;
}
return Integer.compare(hot,o.getHot());
}
/**
* 理解为反序列化得过程
* @param dataOutput
* @throws IOException
*/
@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeInt(year);
dataOutput.writeInt(hot);
}
/**
* 理解为序列化得过程
* @param dataInput
* @throws IOException
*/
@Override
public void readFields(DataInput dataInput) throws IOException {
this.year = dataInput.readInt();
this.hot = dataInput.readInt();
}
@Override
public int hashCode() {
return new Integer(year +hot).hashCode();
}
}
-----------------------------------------------------------------------
然后自定义一个排序
package com.wujie;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
/**
* 自定义排序
*/
public class SortHot extends WritableComparator {
public SortHot() {
super(KeyPari.class,true);
}
/**
* 年份升序,温度降序
* @param a
* @param b
* @return
*/
@Override
public int compare(WritableComparable a, WritableComparable b) {
KeyPari o1 = (KeyPari) a;
KeyPari o2 = (KeyPari) b;
int res = Integer.compare(o1.getYear(),o2.getYear());
if(res != 0){
return res;
}
return -Integer.compare(o1.getHot(),o2.getHot());//降序排序
}
}
----------------------------------------------------------------------------------
进行一个按年份分组
package com.wujie;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
/**
* 自定义分组
*/
public class GroupHost extends WritableComparator {
public GroupHost() {
super(KeyPari.class,true);
}
/**
* 年份相同为一组
* @param a
* @param b
* @return
*/
@Override
public int compare(WritableComparable a, WritableComparable b) {
KeyPari o1 = (KeyPari) a;
KeyPari o2 = (KeyPari) b;
return Integer.compare(o1.getYear(),o2.getYear());
}
}
---------------------------------------------------------------------------------------
自定义一个分区
package com.wujie;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
/**
* 自定义分区
*/
public class FirstPartition extends Partitioner<KeyPari,Text> {
@Override
public int getPartition(KeyPari keyPari, Text value, int num) {
return (keyPari.getYear()*127) % num;//按照年份分区
}
}
----------------------------------------------------------------------
最后就是一个主类运行
package com.wujie;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;
public class RunJob {
public static SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
static class HotMapper extends Mapper<LongWritable,Text,KeyPari,Text>{
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] ss = line.split("\t");
if(ss.length == 2){
try {
Date date = simpleDateFormat.parse(ss);
Calendar calendar = Calendar.getInstance();
calendar.setTime(date);
int year = calendar.get(1);
String hot = ss.substring(0);
KeyPari keyPari = new KeyPari();
keyPari.setHot(Integer.parseInt(hot));
keyPari.setYear(year);
System.out.println("map:"+keyPari);
context.write(keyPari,value);
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
static class HotReduce extends Reducer<KeyPari,Text,KeyPari,Text>{
@Override
protected void reduce(KeyPari key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
for (Text v:values
) {
System.out.println("reduce:---key:"+key);
System.out.println("reduce:--v:"+v);
context.write(key,v);
}
}
}
public static void main(String[] args){
Configuration configuration = new Configuration();
try {
Job job = new Job(configuration);
job.setJobName("hot");
job.setJarByClass(RunJob.class);
job.setMapperClass(HotMapper.class);
job.setReducerClass(HotReduce.class);
job.setMapOutputKeyClass(KeyPari.class);
job.setMapOutputValueClass(Text.class);
job.setNumReduceTasks(3);
job.setPartitionerClass(FirstPartition.class);
job.setSortComparatorClass(SortHot.class);
job.setGroupingComparatorClass(GroupHost.class);
FileInputFormat.addInputPath(job,new Path("/usr/input/hot"));
FileOutputFormat.setOutputPath(job,new Path("/usr/output/hot"));
System.exit(job.waitForCompletion(true) ? 0:1);
}catch (Exception e){
e.printStackTrace();
}
}
}
--------------------------------------------------------------------------------------
运行步骤如下:
1.在hadoop中创建/usr/input/hot/的 文件夹
2.将数据文本上传至/usr/input/hot/的文件夹下
3.将程序打jar包,放到虚拟机中运行
4.最后就是读到数据,也将文件分块了,但是文件中却没有任何的数据
下面可能是一个原因。
/**
* 理解为反序列化得过程
* @param dataOutput
* @throws IOException
*/
@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeInt(year);
dataOutput.writeInt(hot);
}
相关推荐参考
Reducer端数据接收不到,也就是迭代器中貌似就没有数据!求解
http://www.aboutyun.com/forum.php?mod=viewthread&tid=23443
终于找到了问题所在,在这里简单说一下,在这个过程中,因为是新手,对于排除错误的方法,其实就是那么几个,第一,看看官方列子是否可以执行,如果可以执行,说明环境没有太大问题,第二,检查自己的代码是否有问题,第三,就是检查一下你准备的数据格式是否正确了,我就是初心将数据格式弄错了!!
页:
[1]