fish_tx 发表于 2015-10-15 19:30:16

hadoop(大数据)统计相同手机号的总数

需求说明:找相同手机号出现的总数
案例数据
file:///C:\Users\ADMINI~1\AppData\Local\Temp\ksohtml\wpsAF48.tmp.jpg
程序
Mapperpackage com.fish.had.cellphonenumbercount;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;
public class CellPhoneNumberMapper extends Mapper<LongWritable, Text, Text, LongWritable>{
      final LongWritable ONE = new LongWritable(1L);
      @Override      protected void map(LongWritable k1, Text v1,                        Mapper<LongWritable, Text, Text, LongWritable>.Context context)                        throws IOException, InterruptedException {                String v = v1.toString();                if(null != v && !"".equals(v)){                        String[] ss = v.split("\t");//2                        if(ss.length > 2){                              if(ss.length() == 11 && ss.indexOf(".") < 0){                                        context.write(new Text(ss), ONE);                              }                        }                }      }}说明:1、根据案例数据发现,第三列为需要的数据if(null != v && !"".equals(v)){                        String[] ss = v.split("\t");//2                        if(ss.length > 2){2、判断数据,取11位。通过 . 过滤11位的ipif(ss.length() == 11 && ss.indexOf(".") < 0){
Combinerpackage com.fish.had.cellphonenumbercount;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;
public class CellPhoneNumberCombiner extends Reducer<Text, LongWritable, Text, LongWritable>{
      @Override      protected void reduce(Text k2, Iterable<LongWritable> v2s,                        Reducer<Text, LongWritable, Text, LongWritable>.Context context)                        throws IOException, InterruptedException {                long sum = 0L;                for (LongWritable v2 : v2s){                        sum += v2.get();                }                context.write(k2, new LongWritable(sum));      }}

Reducerpackage com.fish.had.cellphonenumbercount;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;
public class CellPhoneNumberReducer extends Reducer<Text, LongWritable, Text, LongWritable>{
      @Override      protected void reduce(Text k2, Iterable<LongWritable> v2s,                        Reducer<Text, LongWritable, Text, LongWritable>.Context context)                        throws IOException, InterruptedException {                long sum = 0L;                for (LongWritable v2 : v2s){                        sum += v2.get();                }                context.write(k2, new LongWritable(sum));      }}
Mainpackage com.fish.had.cellphonenumbercount;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;

public class CellPhoneNumberMain extends Configured implements Tool{
      public static void main(String[] args) throws Exception {                if(args.length != 2){                        System.err.print("param is two ,not other\n");                        System.exit(1);                }                ToolRunner.run(new CellPhoneNumberMain(), args);      }
      @Override      public int run(String[] args) throws Exception {                String INPUTPATH = args;                String OUTPUTPATH = args;                Path outPutPath = new Path(OUTPUTPATH);
                //判断输出目录是否exist,若存在,即删除                Configuration conf = new Configuration();                FileSystem fs = FileSystem.get(new URI(INPUTPATH), conf);                if(fs.exists(outPutPath)) fs.delete(outPutPath, true);
                //生成job                @SuppressWarnings("deprecation")                Job job = new Job(conf, CellPhoneNumberMain.class.getSimpleName());                //指定jar的class                job.setJarByClass(CellPhoneNumberMain.class);                //1.指定输入/输出目录                FileInputFormat.setInputPaths(job, new Path(INPUTPATH));                FileOutputFormat.setOutputPath(job, outPutPath);                //2.指定对输入/输出数据进行格式化的类                job.setInputFormatClass(TextInputFormat.class);                job.setOutputFormatClass(TextOutputFormat.class);                //3.指定自定义的Mapper/Reducer类                job.setMapperClass(CellPhoneNumberMapper.class);                job.setReducerClass(CellPhoneNumberReducer.class);                //3.1指定map输出的<k,v>类型                job.setMapOutputKeyClass(Text.class);                job.setMapOutputValueClass(LongWritable.class);                //3.2指定reducer输出<k,v>类型                job.setOutputKeyClass(Text.class);                job.setOutputValueClass(LongWritable.class);                //4.分区                job.setPartitionerClass(HashPartitioner.class);                job.setNumReduceTasks(1);//0                //5.归约                job.setCombinerClass(CellPhoneNumberCombiner.class);                //6.把作业交给jobTracker运行                job.waitForCompletion(true);
                return 0;      }
}
生成jar


运行

查看结果




页: [1]
查看完整版本: hadoop(大数据)统计相同手机号的总数