|
学习了Hbase和hadoop以后很难不让我联想到怎么样利用htable中的数据进行mapreduce操作以及怎么把mapreduce的数据reduce到htable中。
下面直接上代码
package com.org.mapreduce.hbase;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.Tool;
import com.org.hbase.SimpleHbase;
/**
*
* 从Hbase中读取数据进行Map
* @author hadoop
*/
public class MapFromHbase {
public static class Map extends TableMapper<ImmutableBytesWritable, Text>{
private String cf = new String("user");
private String username = new String("username");
private String age = new String("age");
private String sex = new String("sex");
//从"huser"中读取数据,再封装数据格式如:"username:xqb|sex:male|age:31"
@Override
protected void map(ImmutableBytesWritable key, Result value, Context context)
throws IOException, InterruptedException {
context.write(key, new Text(username + ":" + Bytes.toString(value.getValue(cf.getBytes(), username.getBytes())) +
"|" + age + ":" + Bytes.toString(value.getValue(cf.getBytes(), age.getBytes())) +
"|" + sex + ":" + Bytes.toString(value.getValue(cf.getBytes(), sex.getBytes()))
));
}
}
//解析数据存入表"huserCopy"中
public static class Reduce extends TableReducer<ImmutableBytesWritable, Text, ImmutableBytesWritable>{
@Override
protected void reduce(ImmutableBytesWritable key, Iterable<Text> value,Context context)
throws IOException, InterruptedException {
for(Text v : value){
String[] cols = v.toString().split("\\|");
for(String s : cols){
Put p = new Put(Bytes.toBytes(key.toString()));
p.add(Bytes.toBytes("user"), s.split(":")[0].getBytes(), Bytes.toBytes(s.split(":")[1]));
context.write(null, p);
}
}
}
}
public static class Driver extends Configured implements Tool{
@Override
public int run(String[] args) throws Exception {
Configuration conf = SimpleHbase.getConf();
Job job = new Job(conf,"MapFromHbase");
job.setJarByClass(ReduceToHbase.class);
Scan scan = new Scan();
scan.setCaching(500); // 1 is the default in Scan, which will be bad for MapReduce jobs
scan.setCacheBlocks(false);
TableMapReduceUtil.initTableMapperJob("huser", scan,
Map.class, ImmutableBytesWritable.class, Text.class, job);
TableMapReduceUtil.initTableReducerJob("huserCopy", Reduce.class, job);
job.waitForCompletion(true);
return job.isSuccessful() ? 0 : 1;
}
}
public static void main(String[] args) {
try {
new Driver().run(args);
} catch (Exception e) {
e.printStackTrace();
}
}
}