分享

MapReduce程式設計問題

xiaolongwu1987 发表于 2013-10-26 15:15:46 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 0 5082
下面是我做的MapReduce的程式碼
在Map{ } 裡我是做  讀取HBase的資料
當我的搜尋引擎有人輸入了一個關鍵字   "xxxxx"
那我要怎麼做, 才能把 "xxxxx"丟進 Map{ }中
使得 "xxxxx"可以和 我從HBase讀取出來的資料做比對
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.hbase.mapred.TableInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class LoadHBase{
        public static class HtMap extends TableMapper {
                public void map(ImmutableBytesWritable key, Result value,Context context)throws IOException, InterruptedException {
                        String res = Bytes.toString(value.getValue(Bytes.toBytes("hardware"),Bytes.toBytes("inlat")));                        
                        context.write(new Text(key.toString()), new Text(res));
                }              
        }
        public static class HtReduce extends Reducer {
                public void reduce(Text key, Iterable values, Context context)throws IOException, InterruptedException {
                        String str = new String("");
                        Text final_key = new Text(key);
                        Text final_value = new Text();
                        for (Text tmp : values) {
                                str += tmp.toString();        
                        }
                        final_value.set(str);               
                        context.write(final_key, final_value);
                }
        }
        public LoadHBase() throws Exception {
                String input = "output2";        
                String tablename = "machine";               
                Configuration conf = new Configuration();
                FileSystem fs = FileSystem.get(conf);
                fs.delete(new Path(input),true);
                Job job = new Job(conf, " hbase data to hdfs");
                job.setJarByClass(LoadHBase.class);
                job.setNumReduceTasks(1);
                Scan myScan = new Scan();
                TableMapReduceUtil.initTableMapperJob(tablename, myScan, HtMap.class,Text.class, Text.class, job);
                job.setReducerClass(HtReduce.class);
                job.setOutputFormatClass(TextOutputFormat.class);
                job.setOutputKeyClass(Text.class);
                job.setOutputValueClass(Text.class);
                FileOutputFormat.setOutputPath(job, new Path(input));
                System.exit(job.waitForCompletion(true) ? 0 : 1);
        }
}
                                       
                                                                                                                                                                                                               

没找到任何评论,期待你打破沉寂

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条