分享

Hadoop MapReduce纵表转横表 与 横表转纵表

本帖最后由 pig2 于 2014-9-9 23:42 编辑
问题导读

1、什么是数据库中的横表和纵表?
2、横表如何转为纵表?






输入数据如下:以\t分隔
  1. <span style="font-size:18px;">0-3岁育儿百科 书        23  
  2. 0-5v液位传感器  5  
  3. 0-5轴承 2  
  4. 0-6个月奶粉     23  
  5. 0-6个月奶粉c2c报告      23  
  6. 0-6个月奶粉在线购物排名 23  
  7. 0-6个月奶粉市场前景     23  
  8. 0-6个月配方奶粉 23  
  9. 0.001g电子天平  5  
  10. 0.01t化铝炉     2  
  11. 0.01吨熔铝合金炉        2  
  12. 0.03吨化镁炉    25  
  13. 0.03吨电磁炉    11  
  14. </span>
复制代码


其中左侧是搜索词,右侧是类别,可看成是数据库中的纵表,现需要将输入转成横表,即 类名\t语句1\t语句2...,这样的格式。
MapReduce最适合做这样的事情了。因为经常用到,记录一下。Hive表中的数据要转成横表的时候,单独写个MR来处理就很方便了。
  1. <span style="font-size:18px;">package seg;  
  2.   
  3. import java.io.IOException;  
  4.   
  5. import org.apache.hadoop.conf.Configuration;  
  6. import org.apache.hadoop.conf.Configured;  
  7. import org.apache.hadoop.fs.FileSystem;  
  8. import org.apache.hadoop.fs.Path;  
  9. import org.apache.hadoop.io.LongWritable;  
  10. import org.apache.hadoop.io.Text;  
  11. import org.apache.hadoop.mapreduce.Job;  
  12. import org.apache.hadoop.mapreduce.Mapper;  
  13. import org.apache.hadoop.mapreduce.Reducer;  
  14. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  
  15. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
  16. import org.apache.hadoop.util.GenericOptionsParser;  
  17. import org.apache.hadoop.util.Tool;  
  18. import org.apache.hadoop.util.ToolRunner;  
  19.   
  20. /**
  21. * @author zhf  
  22. * @email zhf.thu@gmail.com
  23. * @version 创建时间:2014年8月24日 上午9:56:45
  24. */  
  25. public class Vertical2Horizontal extends Configured implements Tool{  
  26.     public static void main(String[] args) throws Exception {  
  27.         int exitCode = ToolRunner.run(new Vertical2Horizontal(), args);  
  28.         System.exit(exitCode);  
  29.     }  
  30.   
  31.     @Override  
  32.     public int run(String[] arg0) throws Exception {  
  33.         String[] args = new GenericOptionsParser(arg0).getRemainingArgs();  
  34.         if(args.length != 2){  
  35.             System.out.println("Usage:seg.Horizontal2Vertical <input> <output>");  
  36.             System.exit(1);  
  37.         }  
  38.         Configuration conf = new Configuration();  
  39.         FileSystem fs = FileSystem.get(conf);  
  40.         if(fs.exists(new Path(args[1])))  
  41.             fs.delete(new Path(args[1]),true);  
  42.         Job job = new Job(conf);  
  43.         job.setJarByClass(getClass());  
  44.         job.setMapperClass(HVMapper.class);  
  45.         job.setReducerClass(HVReducer.class);  
  46.         job.setMapOutputKeyClass(Text.class);  
  47.         job.setMapOutputValueClass(Text.class);  
  48.         job.setOutputKeyClass(Text.class);  
  49.         job.setOutputValueClass(Text.class);  
  50.         FileInputFormat.addInputPath(job, new Path(args[0]));  
  51.         FileOutputFormat.setOutputPath(job, new Path(args[1]));  
  52.         return job.waitForCompletion(true) ? 0:1;  
  53.     }  
  54.   
  55.     public static class HVMapper extends Mapper<LongWritable,Text,Text,Text>{  
  56.         private Text text = new Text();  
  57.         private Text clazz = new Text();  
  58.         public void map(LongWritable key,Text value,Context context) throws IOException, InterruptedException{  
  59.             String line = value.toString();  
  60.             String params[] = line.split("\t");  
  61.             text.set(params[0]);  
  62.             clazz.set(params[1]);  
  63.             context.write(clazz,text);  
  64.         }  
  65.     }  
  66.       
  67.     public static class HVReducer extends Reducer<Text,Text,Text,Text>{  
  68.         private Text result = new Text();  
  69.         public void reduce(Text key,Iterable<Text> values,Context context) throws IOException, InterruptedException{  
  70.             String tmp = "";  
  71.             for(Text val : values){  
  72.                 tmp += val + "\t";  
  73.             }  
  74.             result.set(tmp.trim());  
  75.             context.write(key, result);  
  76.         }  
  77.     }  
  78.   
  79. }  
  80. </span>  
复制代码



输出:
  1. <span style="font-size:18px;">1       莱舍万 服装美学 莱芜劳保服装    南京羽绒服特卖会        螃蟹的秘密品牌内衣店    螃蟹的秘密内衣专卖店</span>  
复制代码


今天又要用到了横表转纵表,记录一下。横表转纵表只需要一个Mapper就可以了,完全没有难度,就是把数据吹散开就好了。
  1. <span style="font-size:18px;">package seg;  
  2.   
  3. import java.io.IOException;  
  4.   
  5. import org.apache.hadoop.conf.Configuration;  
  6. import org.apache.hadoop.conf.Configured;  
  7. import org.apache.hadoop.fs.FileSystem;  
  8. import org.apache.hadoop.fs.Path;  
  9. import org.apache.hadoop.io.LongWritable;  
  10. import org.apache.hadoop.io.Text;  
  11. import org.apache.hadoop.mapreduce.Job;  
  12. import org.apache.hadoop.mapreduce.Mapper;  
  13. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  
  14. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
  15. import org.apache.hadoop.util.GenericOptionsParser;  
  16. import org.apache.hadoop.util.Tool;  
  17. import org.apache.hadoop.util.ToolRunner;  
  18.   
  19. /**  
  20. * @ClassName: Horizontal2Vertical  
  21. * @Description: 横表转纵表
  22. * @date 2014年8月27日 下午2:01:35  
  23. *  
  24. */  
  25. public class Horizontal2Vertical extends Configured implements Tool {  
  26.     public static void main(String[] args) throws Exception {  
  27.         int exitCode = ToolRunner.run(new Horizontal2Vertical(), args);  
  28.         System.exit(exitCode);  
  29.     }  
  30.   
  31.     @Override  
  32.     public int run(String[] arg0) throws Exception {  
  33.         String[] args = new GenericOptionsParser(arg0).getRemainingArgs();  
  34.         if(args.length != 2){  
  35.             System.err.println("Usage : TableTransferMR <input> <output>");  
  36.         }  
  37.         Configuration conf = new Configuration();  
  38.         FileSystem fs = FileSystem.get(conf);  
  39.         if(fs.exists(new Path(args[1])))  
  40.             fs.delete(new Path(args[1]),true);  
  41.         Job job = new Job(conf);  
  42.         job.setJarByClass(Horizontal2Vertical.class);  
  43.         job.setMapperClass(TableMapper.class);  
  44.         job.setNumReduceTasks(0);  
  45.         job.setMapOutputKeyClass(Text.class);  
  46.         job.setMapOutputValueClass(Text.class);  
  47.         job.setOutputKeyClass(Text.class);  
  48.         job.setOutputValueClass(Text.class);  
  49.         FileInputFormat.addInputPath(job, new Path(args[0]));  
  50.         FileOutputFormat.setOutputPath(job, new Path(args[1]));  
  51.         return job.waitForCompletion(true) ? 0:1;  
  52.     }  
  53.   
  54.     public static class TableMapper extends Mapper<LongWritable,Text,Text,Text>{  
  55.         public Text baseinfo = new Text();  
  56.         public Text filter = new Text();  
  57.         public void map(LongWritable key,Text value,Context context) throws IOException, InterruptedException{  
  58.             String line = value.toString().trim();  
  59.             String[] params = line.split("\t");  
  60.             String dspid = params[0];  
  61.             String token = params[1];  
  62.             String userseq = params[2];  
  63.             String ip = params[3];  
  64.             String filters = params[8];  
  65.             String platform = params[9];  
  66.             baseinfo.set(dspid+"\t"+token+"\t"+userseq+"\t"+ip);  
  67.             String[] fs = filters.split("\\|");  
  68.             for(String f : fs){  
  69.                 filter.set(f+"\t"+platform);  
  70.                 context.write(baseinfo, filter);  
  71.             }  
  72.         }  
  73.     }  
  74. }  
  75. </span>  
复制代码






已有(3)人评论

跳转到指定楼层
anyhuayong 发表于 2014-9-10 08:36:08
好文章,谢谢楼主分享
回复

使用道具 举报

linhai1023 发表于 2014-9-10 09:46:58
谢谢楼主的分享
回复

使用道具 举报

sunny62520 发表于 2014-9-10 15:09:01
感谢分享,收藏了
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条