Parameter接口:
[mw_shl_code=java,true]package com.xiaohong.TongJi;
public interface Parameter {
/**
* LenFields is the length of fields, or columns; it need to be modified for varied
*/
public final int LenFields = 4;
/**
* iFields[0] for min()
* iFields[1] for max()
* iFields[2] for sum()
* iFields[3] for sum2()
*/
public final int LenStat = 4;
public final int iMin = 0x80000000;
public final int iMax = 0x7FFFFFFF;
public final float fMin = (float) -3.4E38;
public final float fMax = (float) 3.4E38;
}
[/mw_shl_code]
map端:
[mw_shl_code=java,true]package com.xiaohong.TongJi;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class TongJiMapper extends Mapper<LongWritable, Text, IntWritable, FloatWritable> implements Parameter {
/**
* The iFields[][] is the Array for Keys to present the min(), max(), sum()
* of each field in the input. iFields[0] for min() iFields[1] for max()
* iFields[2] for sum() iFields[3] for sum2()
*/
private static IntWritable iFields[][] = new IntWritable[LenStat][LenFields];
private static float min[] = new float[LenFields];
private static float max[] = new float[LenFields];
private static IntWritable iwCnt = new IntWritable(LenStat * LenFields);
private final static FloatWritable one = new FloatWritable(1);
public TongJiMapper() {
for (int i = 0; i < LenStat; i++) {
for (int j = 0; j < LenFields; j++) {
iFields[j] = new IntWritable(i * LenFields + j);
}
}
for (int j = 0; j < LenFields; j++) {
min[j] = fMax; // the maximum integer
max[j] = fMin; // the minimum integer
}
}
@Override
protected void map(LongWritable key, Text values,
Mapper<LongWritable, Text, IntWritable, FloatWritable>.Context context)
throws IOException, InterruptedException {
/*
* 原始数据:
* 9.0000000e+000 1.0000000e+000 2.0000000e+000 4.0000000e+000
3.0000000e+000 8.0000000e+000 4.0000000e+000 8.0000000e+000
6.0000000e+000 5.0000000e+000 9.0000000e+000 1.0000000e+000
5.0000000e+000 6.0000000e+000 9.0000000e+000 2.0000000e+000
9.0000000e+000 8.0000000e+000 4.0000000e+000 2.0000000e+000
7.0000000e+000 9.0000000e+000 9.0000000e+000 2.0000000e+000
5.0000000e+000 7.0000000e+000 1.0000000e+000 6.0000000e+000
*/
StringTokenizer st = new StringTokenizer(values.toString().toLowerCase(), " \t,;");
float iTmp;
for (int j = 0; j < LenFields; j++) {
/** handle each field. */
iTmp = Float.parseFloat(st.nextToken());
/**
* for min(), this judgement just output
* about 37 <key,value> pairs in 100,000
* records.
*/
if (min[j] > iTmp) {
min[j] = iTmp;
context.write(iFields[0][j], new FloatWritable(min[j]));
}
if (max[j] < iTmp) { /** for max() */
max[j] = iTmp;
context.write(iFields[1][j], new FloatWritable(max[j]));
}
context.write(iFields[2][j], new FloatWritable(iTmp));
/** for sum() */
context.write(iFields[3][j], new FloatWritable(iTmp * iTmp));/** for sum2() */
}
context.write(iwCnt, one); /** for cnt() */
}
}
[/mw_shl_code]
reducer端:
[mw_shl_code=java,true]package com.xiaohong.TongJi;
import java.io.IOException;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Reducer;
public class TongJiReducer extends Reducer<IntWritable, FloatWritable, IntWritable, FloatWritable>
implements Parameter {
@Override
protected void reduce(IntWritable K, Iterable<FloatWritable> values,
Reducer<IntWritable, FloatWritable, IntWritable, FloatWritable>.Context context)
throws IOException, InterruptedException {
float min = iMax;
float max = iMin;
float sum = 0;
long iCnt = 0;
int iCategory = K.get() / LenFields; // restore the category from Keys.
switch (iCategory) {
case 0:/** min() */
for (FloatWritable value : values) {
if (min > value.get()) {
min = value.get();
}
}
context.write(K, new FloatWritable(min));
break;
case 1:/** max() */
for (FloatWritable value : values) {
if (max < value.get()) {
max = value.get();
}
}
context.write(K, new FloatWritable(max));
break;
case 2:/** sum() */
for (FloatWritable value : values) {
sum += value.get();
}
context.write(K, new FloatWritable(sum));
break;
case 3: /** sum2() */
for (FloatWritable value : values) {
sum += value.get();
}
context.write(K, new FloatWritable(sum));
break;
case 4:
for (FloatWritable value : values) {
iCnt += value.get();
}
context.write(K, new FloatWritable(iCnt));
break;
} // switch
}
}
[/mw_shl_code]
yarn客户端:
[mw_shl_code=java,true]package com.xiaohong.TongJi;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public class JobSubMitter {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(JobSubMitter.class);
job.setMapperClass(TongJiMapper.class);
job.setReducerClass(TongJiReducer.class);
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(FloatWritable.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(FloatWritable.class);
job.setInputFormatClass(TextInputFormat.class);
FileInputFormat.setInputPaths(job, new Path("d:/mrData/TongJi/input"));
job.setOutputFormatClass(TextOutputFormat.class);
FileOutputFormat.setOutputPath(job, new Path("d:/mrData/TongJi/output"));
boolean res = job.waitForCompletion(true);
System.exit(res ? 0 : 1);
}
}
[/mw_shl_code]
问题:我想问下当reducer端执行到这句代码的时候"int iCategory = K.get()",怎么将K转换成0~4了呢?,K不是0~15吗,难道调用.get()方法有什么玄机吗?
|