实现:
将dfs中的数据插入到hbase中。
注:有使用序列化。
package com.fish.mr2hb;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.log4j.Logger;
public class CellphoneSum {
private static Logger log = Logger.getLogger(CellphoneSum.class);
private static String TABLE = "myHBase";
public static void main(String[] args) throws Exception {
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.rootdir", "hdfs://Hadoop2-Master:9000/hbase");
conf.set("hbase.zookeeper.quorum", "Hadoop2-Master,Hadoop2-Slave1,Hadoop2-Slave2");
Path inpath = new Path("hdfs://Hadoop2-Master:9000/temp/ncmdp/part-r-00000");
Job job = new Job(conf, CellphoneSum.class.getSimpleName());
job.setInputFormatClass(TextInputFormat.class);
job.setJarByClass(CellphoneSum.class);
FileInputFormat.setInputPaths(job, inpath);
job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReduce.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(CellphoneWritable.class);
TableMapReduceUtil.initTableReducerJob(TABLE, MyReduce.class, job);
job.waitForCompletion(true);
}
static class MyMapper extends Mapper<LongWritable, Text, Text, CellphoneWritable>{
@Override
protected void map(
LongWritable k1,
Text v1,
Mapper<LongWritable, Text, Text, CellphoneWritable>.Context context)
throws IOException, InterruptedException {
String[] ss = v1.toString().split("\t");
log.info("---->"+v1.toString()+"---"+ss.length);
context.write(new Text(ss[0]), new CellphoneWritable(ss[0],ss[1]));
}
}
static class MyReduce extends TableReducer<Text, CellphoneWritable, ImmutableBytesWritable>{
private final String FAMILY = "family2";
private final String CELLPHONE = "cellphone";
private final String SUM = "sum";
@Override
protected void reduce(
Text k2,
Iterable<CellphoneWritable> v2s,
Reducer<Text, CellphoneWritable, ImmutableBytesWritable, Mutation>.Context context)
throws IOException, InterruptedException {
byte[] key = ("fc-"+k2.toString()).getBytes();
Put put = new Put(key);
CellphoneWritable cellphoneWritable = v2s.iterator().next();
put.add(FAMILY.getBytes(), CELLPHONE.getBytes(), (cellphoneWritable.phone).getBytes());
put.add(FAMILY.getBytes(), SUM.getBytes(), (cellphoneWritable.sum+"").getBytes());
context.write(new ImmutableBytesWritable(key), put);
}
}
}
class CellphoneWritable implements Writable{
String phone;
long sum;
public CellphoneWritable(){
}
public CellphoneWritable(String phone, String sun){
this.phone = phone;
this.sum = Long.parseLong(sun);
}
@Override
public void readFields(DataInput in) throws IOException {
phone = in.readUTF();
sum = in.readLong();
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(phone);
out.writeLong(sum);
}
public String toString(){
return "---->"+phone+"\t"+sum;
}
}
分享出来,大家可以一起学习。。。
|