需求说明: 找相同手机号出现的总数
案例数据
file:///C:\Users\ADMINI~1\AppData\Local\Temp\ksohtml\wpsAF48.tmp.jpg
程序
Mapperpackage com.fish.had.cellphonenumbercount;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper;
public class CellPhoneNumberMapper extends Mapper<LongWritable, Text, Text, LongWritable>{
final LongWritable ONE = new LongWritable(1L);
@Override protected void map(LongWritable k1, Text v1, Mapper<LongWritable, Text, Text, LongWritable>.Context context) throws IOException, InterruptedException { String v = v1.toString(); if(null != v && !"".equals(v)){ String[] ss = v.split("\t");//2 if(ss.length > 2){ if(ss[2].length() == 11 && ss[2].indexOf(".") < 0){ context.write(new Text(ss[2]), ONE); } } } } } 说明: 1、根据案例数据发现,第三列为需要的数据 if(null != v && !"".equals(v)){ String[] ss = v.split("\t");//2 if(ss.length > 2){ 2、判断数据,取11位。通过 . 过滤11位的ip if(ss[2].length() == 11 && ss[2].indexOf(".") < 0){
Combinerpackage com.fish.had.cellphonenumbercount;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer;
public class CellPhoneNumberCombiner extends Reducer<Text, LongWritable, Text, LongWritable>{
@Override protected void reduce(Text k2, Iterable<LongWritable> v2s, Reducer<Text, LongWritable, Text, LongWritable>.Context context) throws IOException, InterruptedException { long sum = 0L; for (LongWritable v2 : v2s){ sum += v2.get(); } context.write(k2, new LongWritable(sum)); } }
Reducerpackage com.fish.had.cellphonenumbercount;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer;
public class CellPhoneNumberReducer extends Reducer<Text, LongWritable, Text, LongWritable>{
@Override protected void reduce(Text k2, Iterable<LongWritable> v2s, Reducer<Text, LongWritable, Text, LongWritable>.Context context) throws IOException, InterruptedException { long sum = 0L; for (LongWritable v2 : v2s){ sum += v2.get(); } context.write(k2, new LongWritable(sum)); } }
Mainpackage com.fish.had.cellphonenumbercount;
import java.net.URI;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner;
public class CellPhoneNumberMain extends Configured implements Tool{
public static void main(String[] args) throws Exception { if(args.length != 2){ System.err.print("param is two ,not other\n"); System.exit(1); } ToolRunner.run(new CellPhoneNumberMain(), args); }
@Override public int run(String[] args) throws Exception { String INPUTPATH = args[0]; String OUTPUTPATH = args[1]; Path outPutPath = new Path(OUTPUTPATH);
//判断输出目录是否exist,若存在,即删除 Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(new URI(INPUTPATH), conf); if(fs.exists(outPutPath)) fs.delete(outPutPath, true);
//生成job @SuppressWarnings("deprecation") Job job = new Job(conf, CellPhoneNumberMain.class.getSimpleName()); //指定jar的class job.setJarByClass(CellPhoneNumberMain.class); //1.指定输入/输出目录 FileInputFormat.setInputPaths(job, new Path(INPUTPATH)); FileOutputFormat.setOutputPath(job, outPutPath); //2.指定对输入/输出数据进行格式化的类 job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); //3.指定自定义的Mapper/Reducer类 job.setMapperClass(CellPhoneNumberMapper.class); job.setReducerClass(CellPhoneNumberReducer.class); //3.1指定map输出的<k,v>类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class); //3.2指定reducer输出<k,v>类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); //4.分区 job.setPartitionerClass(HashPartitioner.class); job.setNumReduceTasks(1);//0 //5.归约 job.setCombinerClass(CellPhoneNumberCombiner.class); //6.把作业交给jobTracker运行 job.waitForCompletion(true);
return 0; }
}
生成jar
运行
查看结果
|