本帖最后由 Joker 于 2015-1-12 18:11 编辑
测试数据
product_no lac_id moment start_time user_id county_id staytime city_id
13429100031 22554 8 2013-03-11 08:55:19.151754088 571 571 282 571
13429100082 22540 8 2013-03-11 08:58:20.152622488 571 571 270 571
13429100082 22691 8 2013-03-11 08:56:37.149593624 571 571 103 571
13429100087 22705 8 2013-03-11 08:56:51.139539816 571 571 220 571
13429100087 22540 8 2013-03-11 08:55:45.150276800 571 571 66 571
13429100082 22540 8 2013-03-11 08:55:38.140225200 571 571 133 571
13429100140 26642 9 2013-03-11 09:02:19.151754088 571 571 18 571
13429100082 22691 8 2013-03-11 08:57:32.151754088 571 571 287 571
13429100189 22558 8 2013-03-11 08:56:24.139539816 571 571 48 571
13429100349 22503 8 2013-03-11 08:54:30.152622440 571 571 211 571
字段解释:
product_no:用户手机号;
lac_id:用户所在基站;
start_time:用户在此基站的开始时间;
staytime:用户在此基站的逗留时间。
需求描述:
根据lac_id 和start_time 知道用户当时的位置,根据staytime 知道用户各个基站的逗留时
长。根据轨迹合并连续基站的staytime。
最终得到每一个用户按时间排序在每一个基站驻留时长
期望输出举例:
13429100082 22540 8 2013-03-11 08:58:20.152622488 571 571 270 571
13429100082 22691 8 2013-03-11 08:56:37.149593624 571 571 390 571
13429100082 22540 8 2013-03-11 08:55:38.140225200 571 571 133 571
13429100087 22705 8 2013-03-11 08:56:51.139539816 571 571 220 571
13429100087 22540 8 2013-03-11 08:55:45.150276800 571 571 66 571
我的代码
- public static void main(String[] args) throws Exception {
-
- String[] files = {"ms","ms_out"};
- ToolRunner.run(new Configuration(), new JZ(), files );
-
- }
-
-
- @Override
- public int run(String[] args) throws Exception {
-
- Configuration conf = new Configuration();
-
- Job job = Job.getInstance(conf, "xx");
-
- job.setJarByClass(JZ.class);
- job.setMapperClass(JZMapper.class);
- job.setReducerClass(JZReduce.class);
-
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(Text.class);
-
- FileInputFormat.setInputPaths(job, new Path(args[0]));
- FileOutputFormat.setOutputPath(job, new Path(args[1]));
-
- return job.waitForCompletion(true) ? 0 : 1;
- }
-
- public static class JZMapper extends Mapper<Object, Text, Text, Text>{
-
- @Override
- protected void map(Object key, Text value,Context context)
- throws IOException, InterruptedException {
-
- String line = value.toString();
- String[] values = line.split(" ");
-
- if(values != null && values.length > 0){
- String lac_id = values[1];
- String phone = values[0];
- context.write(new Text(phone + lac_id), new Text(line));
- }
- }
-
- }
-
- public static class JZReduce extends Reducer<Text, Text, Text, Text>{
-
-
- @Override
- protected void reduce(Text key, Iterable<Text> values,Context context)
- throws IOException, InterruptedException {
-
- long staytime = 0;
- String line = "";
- StringBuffer info = new StringBuffer("");
- String[] datas = new String[line.split(" ").length];
-
- if(values != null){
-
- for(Text val : values){
-
- line = val.toString();
-
- datas = line.split(" ");
-
- staytime += Long.parseLong(datas[7]);
-
-
- datas[7] = staytime+"";
-
-
- }
-
- for(int x = 0; x < datas.length; x++){
- info.append(datas[x] + " ");
- }
-
- info.deleteCharAt(info.length() - 1);
- context.write(new Text(staytime + ""), new Text(info.toString()));
-
- }
-
-
- }
- }
-
复制代码
|