输入数据如下:以\t分隔
- 0-3岁育儿百科 书 23
- 0-5v液位传感器 5
- 0-5轴承 2
- 0-6个月奶粉 23
- 0-6个月奶粉c2c报告 23
- 0-6个月奶粉在线购物排名 23
- 0-6个月奶粉市场前景 23
- 0-6个月配方奶粉 23
- 0.001g电子天平 5
- 0.01t化铝炉 2
- 0.01吨熔铝合金炉 2
- 0.03吨化镁炉 25
- 0.03吨电磁炉 11
复制代码
其中左侧是搜索词,右侧是类别,可看成是数据库中的纵表,现需要将输入转成横表,即 类名\t语句1\t语句2...,这样的格式。
MapReduce最适合做这样的事情了。因为经常用到,记录一下。Hive表中的数据要转成横表的时候,单独写个MR来处理就很方便了。
- package seg;
-
- import java.io.IOException;
-
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.conf.Configured;
- import org.apache.hadoop.fs.FileSystem;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.LongWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Job;
- import org.apache.hadoop.mapreduce.Mapper;
- import org.apache.hadoop.mapreduce.Reducer;
- import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
- import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
- import org.apache.hadoop.util.GenericOptionsParser;
- import org.apache.hadoop.util.Tool;
- import org.apache.hadoop.util.ToolRunner;
-
- /**
- * @author zhf
- * @email zhf.thu@gmail.com
- * @version 创建时间:2014年8月24日 上午9:56:45
- */
- public class Vertical2Horizontal extends Configured implements Tool{
- public static void main(String[] args) throws Exception {
- int exitCode = ToolRunner.run(new Vertical2Horizontal(), args);
- System.exit(exitCode);
- }
-
- @Override
- public int run(String[] arg0) throws Exception {
- String[] args = new GenericOptionsParser(arg0).getRemainingArgs();
- if(args.length != 2){
- System.out.println("Usage:seg.Horizontal2Vertical <input> <output>");
- System.exit(1);
- }
- Configuration conf = new Configuration();
- FileSystem fs = FileSystem.get(conf);
- if(fs.exists(new Path(args[1])))
- fs.delete(new Path(args[1]),true);
- Job job = new Job(conf);
- job.setJarByClass(getClass());
- job.setMapperClass(HVMapper.class);
- job.setReducerClass(HVReducer.class);
- job.setMapOutputKeyClass(Text.class);
- job.setMapOutputValueClass(Text.class);
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(Text.class);
- FileInputFormat.addInputPath(job, new Path(args[0]));
- FileOutputFormat.setOutputPath(job, new Path(args[1]));
- return job.waitForCompletion(true) ? 0:1;
- }
-
- public static class HVMapper extends Mapper<LongWritable,Text,Text,Text>{
- private Text text = new Text();
- private Text clazz = new Text();
- public void map(LongWritable key,Text value,Context context) throws IOException, InterruptedException{
- String line = value.toString();
- String params[] = line.split("\t");
- text.set(params[0]);
- clazz.set(params[1]);
- context.write(clazz,text);
- }
- }
-
- public static class HVReducer extends Reducer<Text,Text,Text,Text>{
- private Text result = new Text();
- public void reduce(Text key,Iterable<Text> values,Context context) throws IOException, InterruptedException{
- String tmp = "";
- for(Text val : values){
- tmp += val + "\t";
- }
- result.set(tmp.trim());
- context.write(key, result);
- }
- }
-
- }
复制代码
输出:
- 莱舍万 服装美学 莱芜劳保服装 南京羽绒服特卖会 螃蟹的秘密品牌内衣店 螃蟹的秘密内衣专卖店
复制代码
今天又要用到了横表转纵表,记录一下。横表转纵表只需要一个Mapper就可以了,完全没有难度,就是把数据吹散开就好了。
- package seg;
-
- import java.io.IOException;
-
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.conf.Configured;
- import org.apache.hadoop.fs.FileSystem;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.LongWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Job;
- import org.apache.hadoop.mapreduce.Mapper;
- import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
- import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
- import org.apache.hadoop.util.GenericOptionsParser;
- import org.apache.hadoop.util.Tool;
- import org.apache.hadoop.util.ToolRunner;
-
- /**
- * @ClassName: Horizontal2Vertical
- * @Description: 横表转纵表
- * @date 2014年8月27日 下午2:01:35
- *
- */
- public class Horizontal2Vertical extends Configured implements Tool {
- public static void main(String[] args) throws Exception {
- int exitCode = ToolRunner.run(new Horizontal2Vertical(), args);
- System.exit(exitCode);
- }
-
- @Override
- public int run(String[] arg0) throws Exception {
- String[] args = new GenericOptionsParser(arg0).getRemainingArgs();
- if(args.length != 2){
- System.err.println("Usage : TableTransferMR <input> <output>");
- }
- Configuration conf = new Configuration();
- FileSystem fs = FileSystem.get(conf);
- if(fs.exists(new Path(args[1])))
- fs.delete(new Path(args[1]),true);
- Job job = new Job(conf);
- job.setJarByClass(Horizontal2Vertical.class);
- job.setMapperClass(TableMapper.class);
- job.setNumReduceTasks(0);
- job.setMapOutputKeyClass(Text.class);
- job.setMapOutputValueClass(Text.class);
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(Text.class);
- FileInputFormat.addInputPath(job, new Path(args[0]));
- FileOutputFormat.setOutputPath(job, new Path(args[1]));
- return job.waitForCompletion(true) ? 0:1;
- }
-
- public static class TableMapper extends Mapper<LongWritable,Text,Text,Text>{
- public Text baseinfo = new Text();
- public Text filter = new Text();
- public void map(LongWritable key,Text value,Context context) throws IOException, InterruptedException{
- String line = value.toString().trim();
- String[] params = line.split("\t");
- String dspid = params[0];
- String token = params[1];
- String userseq = params[2];
- String ip = params[3];
- String filters = params[8];
- String platform = params[9];
- baseinfo.set(dspid+"\t"+token+"\t"+userseq+"\t"+ip);
- String[] fs = filters.split("\\|");
- for(String f : fs){
- filter.set(f+"\t"+platform);
- context.write(baseinfo, filter);
- }
- }
- }
- }
复制代码
Hadoop MapReduce纵表转横表 与 横表转纵表
|