本帖最后由 lzw 于 2013-12-10 23:18 编辑
MapReduce统计结果直接输出hbase,我使用的是hadoop1.0.4版本和hbase 0.94版本,hadoop和hbase安装伪分布式。1.hadoop安装这里就不讲了。
2.hbase安装我这里将一下。
首页解压habase安装包到/home/hadoop目录。
配置hosts文件如下:
- 192.168.0.101 hadoop.master
复制代码
配置hbase-site.xml,配置内容如下:- <configuration>
- <property>
- <name>hbase.rootdir</name>
- <value>hdfs://hadoop.master:9000/hbase</value>
- </property>
- <property>
- <name>hbase.cluster.distributed</name>
- <value>true</value>
- </property>
- <property>
- <name>hbase.zookeeper.quorum</name>
- <value>hadoop.master</value>
- </property>
- <property>
- <name>hbase.zookeeper.property.dataDir</name>
- <value>/home/hadoop/zookeeper</value>
- </property>
- <property>
- <name>hbase.regionserver.handler.count</name>
- <value>100</value>
- </property>
- <property>
- <name>hbase.hregion.max.filesize</name>
- <value>8589934592</value>
- </property>
- <property>
- <name>hfile.block.cache.size</name>
- <value>0.3</value>
- </property>
- <property>
- <name>dfs.replication</name>
- <value>1</value>
- </property>
- </configuration>
复制代码
hbase-site.xml配置完后,在配置hbase-env.sh,我只把其中配置的如下显示:- export JAVA_HOME=/usr/jdk1.6.0_22
- export HBASE_OPTS="-XX:+UseConcMarkSweepGC"
- export HBASE_MANAGES_ZK=true
复制代码
上面最后一项一定要打开。设置zookeeper管理hbase。
最后配置regionservers 如下:复制代码 注意:如果需要在本地连接hbase,需要关闭防火墙,执行命令 /sbin/service iptables stop
接下来启动hbase,创建表:TestCars,列族Car:
准备数据:- Acura,Integra,Small
- Acura,Legend,Midsize
- Audi,90,Compact
- Audi,100,Midsize
- BMW,535i,Midsize
- Buick,Century,Midsize
- Buick,LeSabre,Large
- Buick,Roadmaster,Large
- Buick,Riviera,Midsize
- Cadillac,DeVille,Large
- Cadillac,Seville,Midsize
复制代码
将数据上传hadoop文件系统:- hadoop fs -copyfromLocal /home/hadoop/Car.txt /home/hadoop/input
复制代码
在运行mapreduce时需要将hbase-0.94.6.jar 、zookeeper-3.4.5.jar、protobuf-java-2.4.0a.jar添加到hadoop lib目录下,或者另一种方式在执行mapreduce时,导入前面三个包。
下面是实现的具体代码:- package com.duplicate;
-
- import java.io.IOException;
- import java.util.ArrayList;
- import java.util.List;
-
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.hbase.HBaseConfiguration;
- import org.apache.hadoop.hbase.client.HTableInterface;
- import org.apache.hadoop.hbase.client.HTablePool;
- import org.apache.hadoop.hbase.client.Put;
- import org.apache.hadoop.hbase.util.Bytes;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Job;
- import org.apache.hadoop.mapreduce.Mapper;
- import org.apache.hadoop.mapreduce.Reducer;
- import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
- import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
-
- public class OutputHbase {
-
- private static Logger logger = LoggerFactory.getLogger(OutputHbase.class);
-
- public static class Map extends Mapper<Object,Text,Text,Text>{
- private Text outKey = new Text();
- private Text outVal = new Text();
-
- public void map(Object key,Text value,Context context) throws IOException,InterruptedException{
- String[] valueSplitted = value.toString().split(",");
- if(valueSplitted.length == 3){
- String brand = valueSplitted[0];
- String model = valueSplitted[1];
- String size = valueSplitted[2];
- outKey.set(brand);
- outVal.set(model + "," + size);
- context.write(outKey, outVal);
- }
- }
- }
-
- public static class Reduce extends Reducer<Text,Text,Text,Text>{
- private HTablePool pool = null;
- private HTableInterface testHTable = null;
- private List<Put> testListPut = new ArrayList<Put>();
-
- @Override
- public void setup(Context context){
- Configuration conf = HBaseConfiguration.create();
- conf.set("hbase.zookeeper.quorum", "192.168.0.101");
- pool = new HTablePool(conf, 10);
- testHTable = pool.getTable("TestCars");
- }
-
- @Override
- public void reduce(Text key,Iterable<Text> values,Context context)throws IOException,InterruptedException{
- String brand = key.toString();
-
- for(Text tx : values){
- String[] valueSplitted = tx.toString().split(",");
- if(valueSplitted.length == 2){
- String model = valueSplitted[0];
- String size = valueSplitted[1];
-
- byte[] putKey = Bytes.toBytes(brand+","+model);
- byte[] putFmaily = Bytes.toBytes("Car");
- Put put = new Put(putKey);
-
- byte[] putQ = Bytes.toBytes("brand");
- byte[] putVal = Bytes.toBytes(brand);
- put.add(putFmaily,putQ,putVal);
-
- putQ = Bytes.toBytes("model");
- putVal = Bytes.toBytes(model);
- put.add(putFmaily,putQ,putVal);
-
- putQ = Bytes.toBytes("size");
- putVal = Bytes.toBytes(size);
- put.add(putFmaily,putQ,putVal);
- testListPut.add(put);
- }
- }// End for
-
- testHTable.put(testListPut);
- testHTable.flushCommits();
- }
-
- @Override
- public void cleanup(Context context)throws IOException{
- if(null != testHTable){
- testHTable.close();
- }
-
- if(null != pool){
- pool.close();
- }
- }
- }
-
- /**
- * @param args
- * @throws IOException
- * @throws ClassNotFoundException
- * @throws InterruptedException
- */
- public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
- // TODO Auto-generated method stub
- Configuration conf = new Configuration();
-
-
- Job job = new Job(conf,"OutputHbase");
- //TableMapReduceUtil.addDependencyJars(job);
- job.setJarByClass(OutputHbase.class);
- job.setMapperClass(Map.class);
- job.setReducerClass(Reduce.class);
-
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(Text.class);
- FileInputFormat.addInputPath(job, new Path(args[0]));
- FileOutputFormat.setOutputPath(job, new Path(args[1]));
- System.exit(job.waitForCompletion(true)?0:1);
- }
-
- }
复制代码
执行方式:- hadoop jar /home/hadoop/dedup.jar com.duplicate.OutputHbase /home/hadoop/input/* /home/hadoop/output
复制代码
查看结果:两种方式一种直接在hbase客户查看,另一种是用程序直接读出来:
hbase客户端查询:复制代码 java代码查询,我下面只查询了主键key值:- package com.duplicate.local;
-
- import java.io.IOException;
-
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.hbase.HBaseConfiguration;
- import org.apache.hadoop.hbase.client.HTableInterface;
- import org.apache.hadoop.hbase.client.HTablePool;
- import org.apache.hadoop.hbase.client.Result;
- import org.apache.hadoop.hbase.client.ResultScanner;
- import org.apache.hadoop.hbase.client.Scan;
- import org.apache.hadoop.hbase.util.Bytes;
-
- public class ConnectionHbase {
- private static HTablePool pool = null;
-
- /**
- * @param args
- */
- public static void main(String[] args) {
- ConnectionHbase hbase = new ConnectionHbase();
- hbase.run();
- }
-
- public void run() {
- // TODO Auto-generated method stub
- Configuration conf = HBaseConfiguration.create();
- HTableInterface testHTable = null;
- conf.set("hbase.zookeeper.quorum", "192.168.0.101");
- pool = new HTablePool(conf, 10);
-
- testHTable = pool.getTable("TestCars");
- Scan scan = new Scan();
- try {
- ResultScanner res = testHTable.getScanner(scan);
- for(Result rs : res){
- System.out.println(Bytes.toString(rs.getRow()));
- }
- } catch (IOException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- }
-
- }
复制代码
|