若余相思28 发表于 2018-6-1 18:18:10

大数据mapreduce疑惑

Parameter接口:
package com.xiaohong.TongJi;

public interface Parameter {
      /**
         * LenFields is the length of fields, or columns; it need to be modified for varied
         */
      public final int LenFields = 4;
      /**
         * iFields for min()
         * iFields for max()
         * iFields for sum()
         * iFields for sum2()
         */
      public final int LenStat = 4;

      public final int iMin = 0x80000000;
      public final int iMax = 0x7FFFFFFF;
      public final float fMin = (float) -3.4E38;
      public final float fMax = (float) 3.4E38;
}


map端:
package com.xiaohong.TongJi;

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class TongJiMapper extends Mapper<LongWritable, Text, IntWritable, FloatWritable> implements Parameter {

      /**
         * The iFields[][] is the Array for Keys to present the min(), max(), sum()
         * of each field in the input. iFields for min() iFields for max()
         * iFields for sum() iFields for sum2()
         */

      private static IntWritable iFields[][] = new IntWritable;
      private static float min[] = new float;
      private static float max[] = new float;
      private static IntWritable iwCnt = new IntWritable(LenStat * LenFields);
      private final static FloatWritable one = new FloatWritable(1);

      public TongJiMapper() {
                for (int i = 0; i < LenStat; i++) {
                        for (int j = 0; j < LenFields; j++) {
                              iFields = new IntWritable(i * LenFields + j);
                        }
                }

                for (int j = 0; j < LenFields; j++) {
                        min = fMax; // the maximum integer
                        max = fMin; // the minimum integer
                }

      }

      @Override
      protected void map(LongWritable key, Text values,
                        Mapper<LongWritable, Text, IntWritable, FloatWritable>.Context context)
                        throws IOException, InterruptedException {

                /*
               * 原始数据:
               *         9.0000000e+000      1.0000000e+000      2.0000000e+000      4.0000000e+000
                        3.0000000e+000      8.0000000e+000      4.0000000e+000      8.0000000e+000
                        6.0000000e+000      5.0000000e+000      9.0000000e+000      1.0000000e+000
                        5.0000000e+000      6.0000000e+000      9.0000000e+000      2.0000000e+000
                        9.0000000e+000      8.0000000e+000      4.0000000e+000      2.0000000e+000
                        7.0000000e+000      9.0000000e+000      9.0000000e+000      2.0000000e+000
                        5.0000000e+000      7.0000000e+000      1.0000000e+000      6.0000000e+000
               */
                StringTokenizer st = new StringTokenizer(values.toString().toLowerCase(), " \t,;");
                float iTmp;
                for (int j = 0; j < LenFields; j++) {
                        /** handle each field. */
                        iTmp = Float.parseFloat(st.nextToken());
                        /**
                         * for min(), this judgement just output
                         * about 37 <key,value> pairs in 100,000
                         * records.
                         */
                        
                        if (min > iTmp) {
                              min = iTmp;
                              context.write(iFields, new FloatWritable(min));
                        }
                        if (max < iTmp) { /** for max() */
                              max = iTmp;
                              context.write(iFields, new FloatWritable(max));
                        }
                        context.write(iFields, new FloatWritable(iTmp));
                        /** for sum() */
                        context.write(iFields, new FloatWritable(iTmp * iTmp));/** for sum2() */
                }
                context.write(iwCnt, one); /** for cnt() */

      }

}


reducer端:
package com.xiaohong.TongJi;

import java.io.IOException;

import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Reducer;

public class TongJiReducer extends Reducer<IntWritable, FloatWritable, IntWritable, FloatWritable>
                implements Parameter {

      @Override
      protected void reduce(IntWritable K, Iterable<FloatWritable> values,
                        Reducer<IntWritable, FloatWritable, IntWritable, FloatWritable>.Context context)
                        throws IOException, InterruptedException {

                float min = iMax;
                float max = iMin;
                float sum = 0;
                long iCnt = 0;
                int iCategory = K.get() / LenFields; // restore the category from Keys.
                switch (iCategory) {
                case 0:/** min() */
                        for (FloatWritable value : values) {
                              if (min > value.get()) {
                                        min = value.get();
                              }
                        }
                        context.write(K, new FloatWritable(min));
                        break;
                case 1:/** max() */
                        for (FloatWritable value : values) {
                              if (max < value.get()) {
                                        max = value.get();
                              }
                        }
                        context.write(K, new FloatWritable(max));
                        break;
                case 2:/** sum() */
                        for (FloatWritable value : values) {
                              sum += value.get();
                        }
                        context.write(K, new FloatWritable(sum));
                        break;
                case 3: /** sum2() */
                        for (FloatWritable value : values) {
                              sum += value.get();
                        }
                        context.write(K, new FloatWritable(sum));
                        break;
                case 4:
                        for (FloatWritable value : values) {
                              iCnt += value.get();
                        }
                        context.write(K, new FloatWritable(iCnt));
                        break;
                } // switch

      }

}


yarn客户端:
package com.xiaohong.TongJi;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class JobSubMitter {

      public static void main(String[] args) throws Exception {

                Configuration conf = new Configuration();
                Job job = Job.getInstance(conf);

                job.setJarByClass(JobSubMitter.class);

                job.setMapperClass(TongJiMapper.class);
                job.setReducerClass(TongJiReducer.class);

                job.setMapOutputKeyClass(IntWritable.class);
                job.setMapOutputValueClass(FloatWritable.class);

                job.setOutputKeyClass(IntWritable.class);
                job.setOutputValueClass(FloatWritable.class);

                job.setInputFormatClass(TextInputFormat.class);
                FileInputFormat.setInputPaths(job, new Path("d:/mrData/TongJi/input"));

                job.setOutputFormatClass(TextOutputFormat.class);
                FileOutputFormat.setOutputPath(job, new Path("d:/mrData/TongJi/output"));

                boolean res = job.waitForCompletion(true);
                System.exit(res ? 0 : 1);

      }

}


问题:我想问下当reducer端执行到这句代码的时候"int iCategory = K.get()",怎么将K转换成0~4了呢?,K不是0~15吗,难道调用.get()方法有什么玄机吗?

nextuser 发表于 2018-6-2 07:48:03

为什么map的代码都注释掉。虽然只看懂了一部分,但是下面似乎看懂了。
int iCategory = K.get() / LenFields; // restore the category from Keys.

假如K.get() 是16,那么LenFields是4,那么iCategory的结果就是 int iCategory=4

若余相思28 发表于 2018-6-2 09:53:22

nextuser 发表于 2018-6-2 07:48
为什么map的代码都注释掉。虽然只看懂了一部分,但是下面似乎看懂了。
int iCategory = K.get() / LenFi ...

我就是不懂你说这里,能解释下么,.get()方法不就是将value变为整形吗,怎么变成别的数字呢

arsenduan 发表于 2018-6-2 12:39:54

若余相思28 发表于 2018-6-2 09:53
我就是不懂你说这里,能解释下么,.get()方法不就是将value变为整形吗,怎么变成别的数字呢
楼主理解了第一步,第二步其实就是我们高中的取余数,O(∩_∩)O哈哈~
int iCategory = K.get() / LenFields;
看上面,LenFields=4的,这个没有问题吧
看这里
package com.xiaohong.TongJi;

public interface Parameter {
      /**
         * LenFields is the length of fields, or columns; it need to be modified for varied
         */
      public final int LenFields = 4;
      /**
         * iFields for min()
         * iFields for max()
         * iFields for sum()
         * iFields for sum2()
         */
      public final int LenStat = 4;

      public final int iMin = 0x80000000;
      public final int iMax = 0x7FFFFFFF;
      public final float fMin = (float) -3.4E38;
      public final float fMax = (float) 3.4E38;
}

那么任何数对4取余都超不过4的。

比如:
1/4取余是1
2/4取余是2
3/4取余是3
4/4取余是0
5/4取余是1
如此循环。都不会超过4





若余相思28 发表于 2018-6-2 13:09:46

arsenduan 发表于 2018-6-2 12:39
楼主理解了第一步,第二步其实就是我们高中的取余数,O(∩_∩)O哈哈~
int iCategory = K.get() / LenFie ...

我懂了
int iCategory = K.get() / LenFields;   这个并不是取余数
而是取整数,我们懂了,我一直把它看成取余数,我懂了,谢谢您哈!
页: [1]
查看完整版本: 大数据mapreduce疑惑