ww8605853 发表于 2018-5-22 21:49:45

hadoop之MapReduce的问题,请求解答!

刚接触的hadoop,作为一个资深的小白,刚踏入hadoop,搭建成功环境以后,且能访问的页面都已经成功访问成功,肯定就是非常希望能够运行一个实例来查看结果了,可是我不知道为什么就是出现了各种问题,在解决了很多问题后,终于让MapReduce的第一个实例运行不报错了,但是却也没有得到自己想要的结果。希望能够有人能够帮助下!

并没有报错,就是没有结果!
看下代码吧!
先新建了一个类来存数据
package com.wujie;

import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Objects;

public class KeyPari implements WritableComparable<KeyPari> {
    private int year;
    private int hot;

    public int getYear() {
      return year;
    }

    public void setYear(int year) {
      this.year = year;
    }

    public int getHot() {
      return hot;
    }

    public void setHot(int hot) {
      this.hot = hot;
    }

    @Override
    public String toString() {
      return "Patri{" +
                "year=" + year +
                ", hot=" + hot +
                '}';
    }

    @Override
    public int compareTo(KeyPari o) {
      int res = Integer.compare(year,o.getYear());
      if(res != 0){
            return res ;
      }
      return Integer.compare(hot,o.getHot());
    }

    /**
   * 理解为反序列化得过程
   * @param dataOutput
   * @throws IOException
   */
    @Override
    public void write(DataOutput dataOutput) throws IOException {
      dataOutput.writeInt(year);
      dataOutput.writeInt(hot);
    }

    /**
   * 理解为序列化得过程
   * @param dataInput
   * @throws IOException
   */
    @Override
    public void readFields(DataInput dataInput) throws IOException {
      this.year = dataInput.readInt();
      this.hot = dataInput.readInt();
    }


    @Override
    public int hashCode() {

      return new Integer(year +hot).hashCode();
    }
}

-----------------------------------------------------------------------
然后自定义一个排序
package com.wujie;

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

/**
* 自定义排序
*/
public class SortHot extends WritableComparator {
    public SortHot() {
      super(KeyPari.class,true);
    }

    /**
   * 年份升序,温度降序
   * @param a
   * @param b
   * @return
   */
    @Override
    public int compare(WritableComparable a, WritableComparable b) {
      KeyPari o1 = (KeyPari) a;
      KeyPari o2 = (KeyPari) b;
      int res = Integer.compare(o1.getYear(),o2.getYear());
      if(res != 0){
            return res;
      }
      return -Integer.compare(o1.getHot(),o2.getHot());//降序排序
    }
}

----------------------------------------------------------------------------------
进行一个按年份分组
package com.wujie;

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

/**
* 自定义分组
*/
public class GroupHost extends WritableComparator {
    public GroupHost() {
      super(KeyPari.class,true);
    }

    /**
   * 年份相同为一组
   * @param a
   * @param b
   * @return
   */
    @Override
    public int compare(WritableComparable a, WritableComparable b) {
      KeyPari o1 = (KeyPari) a;
      KeyPari o2 = (KeyPari) b;
      return Integer.compare(o1.getYear(),o2.getYear());

    }
}

---------------------------------------------------------------------------------------
自定义一个分区
package com.wujie;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

/**
* 自定义分区
*/
public class FirstPartition extends Partitioner<KeyPari,Text> {
    @Override
    public int getPartition(KeyPari keyPari, Text value, int num) {
      return (keyPari.getYear()*127) % num;//按照年份分区
    }
}

----------------------------------------------------------------------
最后就是一个主类运行
package com.wujie;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;


import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;

public class RunJob {
    public static SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    static class HotMapper extends Mapper<LongWritable,Text,KeyPari,Text>{
      @Override
      protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String line = value.toString();
            String[] ss = line.split("\t");
            if(ss.length == 2){
                try {
                  Date date = simpleDateFormat.parse(ss);
                  Calendar calendar = Calendar.getInstance();
                  calendar.setTime(date);
                  int year = calendar.get(1);
                  String hot = ss.substring(0);
                  KeyPari keyPari = new KeyPari();
                  keyPari.setHot(Integer.parseInt(hot));
                  keyPari.setYear(year);
                  System.out.println("map:"+keyPari);
                  context.write(keyPari,value);
                } catch (Exception e) {
                  e.printStackTrace();
                }
            }
      }
    }

    static class HotReduce extends Reducer<KeyPari,Text,KeyPari,Text>{
      @Override
      protected void reduce(KeyPari key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
            for (Text v:values
               ) {
                  System.out.println("reduce:---key:"+key);
                  System.out.println("reduce:--v:"+v);
                context.write(key,v);

            }
      }
    }

    public static void main(String[] args){
      Configuration configuration = new Configuration();
      try {
            Job job = new Job(configuration);
            job.setJobName("hot");
            job.setJarByClass(RunJob.class);
            job.setMapperClass(HotMapper.class);
            job.setReducerClass(HotReduce.class);
            job.setMapOutputKeyClass(KeyPari.class);
            job.setMapOutputValueClass(Text.class);
            job.setNumReduceTasks(3);
            job.setPartitionerClass(FirstPartition.class);
            job.setSortComparatorClass(SortHot.class);
            job.setGroupingComparatorClass(GroupHost.class);

            FileInputFormat.addInputPath(job,new Path("/usr/input/hot"));
            FileOutputFormat.setOutputPath(job,new Path("/usr/output/hot"));
            System.exit(job.waitForCompletion(true) ? 0:1);
      }catch (Exception e){
            e.printStackTrace();
      }
    }
}

--------------------------------------------------------------------------------------
运行步骤如下:
1.在hadoop中创建/usr/input/hot/的 文件夹
2.将数据文本上传至/usr/input/hot/的文件夹下
3.将程序打jar包,放到虚拟机中运行
4.最后就是读到数据,也将文件分块了,但是文件中却没有任何的数据

easthome001 发表于 2018-5-23 10:46:53

下面可能是一个原因。
/**
   * 理解为反序列化得过程
   * @param dataOutput
   * @throws IOException
   */
    @Override
    public void write(DataOutput dataOutput) throws IOException {
      dataOutput.writeInt(year);
      dataOutput.writeInt(hot);
    }

相关推荐参考
Reducer端数据接收不到,也就是迭代器中貌似就没有数据!求解
http://www.aboutyun.com/forum.php?mod=viewthread&tid=23443


ww8605853 发表于 2018-5-25 21:47:20

终于找到了问题所在,在这里简单说一下,在这个过程中,因为是新手,对于排除错误的方法,其实就是那么几个,第一,看看官方列子是否可以执行,如果可以执行,说明环境没有太大问题,第二,检查自己的代码是否有问题,第三,就是检查一下你准备的数据格式是否正确了,我就是初心将数据格式弄错了!!
页: [1]
查看完整版本: hadoop之MapReduce的问题,请求解答!