分享

一道面试题的疑惑

Joker 发表于 2015-1-12 18:04:51 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 3 13962
本帖最后由 Joker 于 2015-1-12 18:11 编辑


测试数据
product_no lac_id moment start_time user_id county_id staytime city_id
13429100031 22554 8 2013-03-11 08:55:19.151754088 571 571 282 571
13429100082 22540 8 2013-03-11 08:58:20.152622488 571 571 270 571
13429100082 22691 8 2013-03-11 08:56:37.149593624 571 571 103 571
13429100087 22705 8 2013-03-11 08:56:51.139539816 571 571 220 571
13429100087 22540 8 2013-03-11 08:55:45.150276800 571 571 66 571
13429100082 22540 8 2013-03-11 08:55:38.140225200 571 571 133 571
13429100140 26642 9 2013-03-11 09:02:19.151754088 571 571 18 571
13429100082 22691 8 2013-03-11 08:57:32.151754088 571 571 287 571
13429100189 22558 8 2013-03-11 08:56:24.139539816 571 571 48 571
13429100349 22503 8 2013-03-11 08:54:30.152622440 571 571 211 571
字段解释:
product_no:用户手机号;
lac_id:用户所在基站;
start_time:用户在此基站的开始时间;
staytime:用户在此基站的逗留时间。
需求描述:
根据lac_id 和start_time 知道用户当时的位置,根据staytime 知道用户各个基站的逗留时
长。根据轨迹合并连续基站的staytime。
最终得到每一个用户按时间排序在每一个基站驻留时长
期望输出举例:
13429100082 22540 8 2013-03-11 08:58:20.152622488 571 571 270 571
13429100082 22691 8 2013-03-11 08:56:37.149593624 571 571 390 571
13429100082 22540 8 2013-03-11 08:55:38.140225200 571 571 133 571
13429100087 22705 8 2013-03-11 08:56:51.139539816 571 571 220 571
13429100087 22540 8 2013-03-11 08:55:45.150276800 571 571 66 571


我的代码
  1. public static void main(String[] args) throws Exception {
  2.                
  3.                 String[] files = {"ms","ms_out"};
  4.                 ToolRunner.run(new Configuration(), new JZ(), files );
  5.                
  6.         }
  7.        
  8.        
  9.         @Override
  10.         public int run(String[] args) throws Exception {
  11.                
  12.                 Configuration conf = new Configuration();
  13.                
  14.                 Job job = Job.getInstance(conf, "xx");
  15.                
  16.                 job.setJarByClass(JZ.class);
  17.                 job.setMapperClass(JZMapper.class);
  18.                 job.setReducerClass(JZReduce.class);
  19.                
  20.                 job.setOutputKeyClass(Text.class);
  21.                 job.setOutputValueClass(Text.class);
  22.                
  23.                 FileInputFormat.setInputPaths(job, new Path(args[0]));
  24.                 FileOutputFormat.setOutputPath(job, new Path(args[1]));
  25.                
  26.                 return job.waitForCompletion(true) ? 0 : 1;
  27.         }
  28.        
  29.         public static class JZMapper extends Mapper<Object, Text, Text, Text>{
  30.                
  31.                 @Override
  32.                 protected void map(Object key, Text value,Context context)
  33.                                 throws IOException, InterruptedException {
  34.                        
  35.                         String line = value.toString();
  36.                         String[] values = line.split(" ");
  37.                        
  38.                         if(values != null && values.length > 0){
  39.                                 String lac_id = values[1];
  40.                                 String phone = values[0];
  41.                                 context.write(new Text(phone + lac_id), new Text(line));
  42.                         }
  43.                 }
  44.                
  45.         }
  46.        
  47.         public static class JZReduce extends Reducer<Text, Text, Text, Text>{
  48.                
  49.                
  50.                 @Override
  51.                 protected void reduce(Text key, Iterable<Text> values,Context context)
  52.                                 throws IOException, InterruptedException {
  53.                         long staytime = 0;
  54.                         String line = "";
  55.                         StringBuffer info = new StringBuffer("");
  56.                         String[] datas = new String[line.split(" ").length];
  57.                        
  58.                         if(values != null){
  59.                                
  60.                                 for(Text val : values){
  61.                                        
  62.                                         line = val.toString();
  63.                                        
  64.                                         datas = line.split(" ");
  65.                                        
  66.                                         staytime += Long.parseLong(datas[7]);
  67.                                        
  68.                                        
  69.                                         datas[7] = staytime+"";
  70.                                        
  71.                                        
  72.                                 }
  73.                                
  74.                                 for(int x = 0; x < datas.length; x++){
  75.                                         info.append(datas[x] + " ");
  76.                                 }
  77.                                
  78.                                 info.deleteCharAt(info.length() - 1);
  79.                                 context.write(new Text(staytime + ""), new Text(info.toString()));       
  80.                                
  81.                         }
  82.                        
  83.                        
  84.                 }
  85.         }
  86.        
复制代码




已有(3)人评论

跳转到指定楼层
Joker 发表于 2015-1-12 18:11:50
我一直不能输出它需要的数据值,因为我找不到特性相同的。
回复

使用道具 举报

goldtimes 发表于 2015-1-12 19:13:20
Joker 发表于 2015-1-12 18:11
我一直不能输出它需要的数据值,因为我找不到特性相同的。

楼主没有分区和排序所以不会出现想要的结果。

首先不考虑实际的因素,如数据量、集群数量等
思路是这样的:

1.把相同的手机号,放到同一个reducer中,这个需要重写
分区函数例如下面代码,重写的目的就是让手机号,归于同一个reduce

  1. package com.cknote.hadoop.globlesort;
  2. import org.apache.hadoop.io.LongWritable;
  3. import org.apache.hadoop.io.NullWritable;
  4. import org.apache.hadoop.mapreduce.Partitioner;
  5. public class MyPartitioner extends Partitioner<LongWritable, NullWritable> {
  6.         @Override
  7.         public int getPartition(LongWritable key, NullWritable value,
  8.                         int numPartitions) {
  9.                 long tmp = key.get();
  10.                 if (tmp <= 100) {
  11.                         return 0 % numPartitions;
  12.                 } else if (tmp <= 1000) {
  13.                         return 1 % numPartitions;
  14.                 } else {
  15.                         return 2 % numPartitions;
  16.                 }
  17.         }
  18. }
复制代码

详细参考

【mapreduce进阶编程五】全局排序


2.把同一个分区的手机号进行排序

排序是在reduce中进行的,可以使用全局排序,也就是【mapreduce进阶编程五】全局排序这个帖子中。

同样还有其他可以参考
MapReduce初级案例(2):使用MapReduce数据排序

hadoop mapreduce 统计次数及排序使用两个mapreduce












回复

使用道具 举报

pengsuyun 发表于 2015-1-13 11:08:46
很好,收藏了。
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条