补充一下: 使用MapReduce往Hbase插入数据:
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import com.hbase.log.RecordParser;
public class HbaseInsertData {
public static class HbaseMapper
extends Mapper<LongWritable, Text, Text, Text>{
RecordParser parser = new RecordParser();
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
parser.parse(value);
String phone = parser.getPhone();
int bloodPressure = parser.getBloodPressure();
if(bloodPressure > 150) {
context.write(new Text(phone), new Text(bloodPressure + ""));
}
}
}
public static class HbaseReducer
extends TableReducer<Text, Text, ImmutableBytesWritable> {
@Override
protected void reduce(Text key, Iterable<Text> values,
Context context)
throws IOException, InterruptedException {
String value = values.iterator().next().toString();
Put putRow = new Put(key.getBytes());
putRow.add("f1".getBytes(), "qualifier".getBytes(), value.getBytes());
context.write(new ImmutableBytesWritable(key.getBytes()), putRow);
}
}
public static void main(String[] args) throws Exception{
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum.", "localhost"); //千万别忘记配置
Job job = new Job(conf, "count");
job.setJarByClass(HbaseInsertData.class);
job.setMapperClass(HbaseMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
Path in = new Path("hdfs://localhost:9000/input");
FileInputFormat.addInputPath(job, in);
TableMapReduceUtil.initTableReducerJob("tab1", HbaseReducer.class, job);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
} 复制代码
解析的classRecordParserimport org.apache.hadoop.io.Text;
public class RecordParser {
private String id;
private String phone;
private int bloodPressure;
public void parse(String record) {
String[] logs = record.split(",");
id = logs[1];
phone = logs[3];
bloodPressure = Integer.parseInt(logs[4]);
}
public void parse(Text record) {
this.parse(record.toString());
}
public String getId() {
return id;
}
public String getPhone() {
return phone;
}
public int getBloodPressure() {
return bloodPressure;
}
} 复制代码