lzw 发表于 2013-12-10 23:19:20

Hadoop MapReduce统计结果直接输出hbase

本帖最后由 lzw 于 2013-12-10 23:18 编辑

MapReduce统计结果直接输出hbase,我使用的是hadoop1.0.4版本和hbase 0.94版本,hadoop和hbase安装伪分布式。1.hadoop安装这里就不讲了。
2.hbase安装我这里将一下。
首页解压habase安装包到/home/hadoop目录。
配置hosts文件如下:192.168.0.101   hadoop.master配置hbase-site.xml,配置内容如下:<configuration>
<property>
   <name>hbase.rootdir</name>
   <value>hdfs://hadoop.master:9000/hbase</value>
</property>
<property>
   <name>hbase.cluster.distributed</name>
   <value>true</value>
</property>
<property>
<name>hbase.zookeeper.quorum</name>
<value>hadoop.master</value>
</property>
<property>
<name>hbase.zookeeper.property.dataDir</name>
<value>/home/hadoop/zookeeper</value>
</property>
<property>
<name>hbase.regionserver.handler.count</name>
<value>100</value>
</property>
<property>
<name>hbase.hregion.max.filesize</name>
<value>8589934592</value>
</property>
<property>
<name>hfile.block.cache.size</name>
<value>0.3</value>
</property>
<property>
<name>dfs.replication</name>
<value>1</value>
</property>
</configuration>hbase-site.xml配置完后,在配置hbase-env.sh,我只把其中配置的如下显示:export JAVA_HOME=/usr/jdk1.6.0_22
export HBASE_OPTS="-XX:+UseConcMarkSweepGC"
export HBASE_MANAGES_ZK=true上面最后一项一定要打开。设置zookeeper管理hbase。

最后配置regionservers 如下:hadoop.master注意:如果需要在本地连接hbase,需要关闭防火墙,执行命令/sbin/service iptables stop

接下来启动hbase,创建表:TestCars,列族Car:
准备数据:Acura,Integra,Small
Acura,Legend,Midsize
Audi,90,Compact
Audi,100,Midsize
BMW,535i,Midsize
Buick,Century,Midsize
Buick,LeSabre,Large
Buick,Roadmaster,Large
Buick,Riviera,Midsize
Cadillac,DeVille,Large
Cadillac,Seville,Midsize将数据上传hadoop文件系统: hadoop fs -copyfromLocal /home/hadoop/Car.txt /home/hadoop/input在运行mapreduce时需要将hbase-0.94.6.jar 、zookeeper-3.4.5.jar、protobuf-java-2.4.0a.jar添加到hadoop lib目录下,或者另一种方式在执行mapreduce时,导入前面三个包。
下面是实现的具体代码:package com.duplicate;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.HTablePool;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class OutputHbase {

      private static Logger logger = LoggerFactory.getLogger(OutputHbase.class);
      
      public static class Map extends Mapper<Object,Text,Text,Text>{
                private Text outKey = new Text();
                private Text outVal = new Text();
               
                public void map(Object key,Text value,Context context) throws IOException,InterruptedException{
                        String[] valueSplitted = value.toString().split(",");
                        if(valueSplitted.length == 3){
                              String brand = valueSplitted;
                  String model = valueSplitted;
                  String size = valueSplitted;
                              outKey.set(brand);
                              outVal.set(model + "," + size);
                              context.write(outKey, outVal);
                        }
                }
      }
      
      public static class Reduce extends Reducer<Text,Text,Text,Text>{
                private HTablePool pool = null;
                private HTableInterface testHTable = null;
                private List<Put> testListPut = new ArrayList<Put>();
               
                @Override
                public void setup(Context context){
                         Configuration conf = HBaseConfiguration.create();
                         conf.set("hbase.zookeeper.quorum", "192.168.0.101");
                     pool = new HTablePool(conf, 10);
                     testHTable = pool.getTable("TestCars");
                }
               
                @Override
                public void reduce(Text key,Iterable<Text> values,Context context)throws IOException,InterruptedException{
                        String brand = key.toString();
               
                        for(Text tx : values){
                              String[] valueSplitted = tx.toString().split(",");
                              if(valueSplitted.length == 2){
                                        String model = valueSplitted;
                            String size = valueSplitted;
                           
                            byte[] putKey = Bytes.toBytes(brand+","+model);
                            byte[] putFmaily = Bytes.toBytes("Car");
                            Put put = new Put(putKey);
                           
                            byte[] putQ = Bytes.toBytes("brand");
                            byte[] putVal = Bytes.toBytes(brand);
                            put.add(putFmaily,putQ,putVal);
                           
                            putQ = Bytes.toBytes("model");
                            putVal = Bytes.toBytes(model);
                            put.add(putFmaily,putQ,putVal);
                           
                            putQ = Bytes.toBytes("size");
                            putVal = Bytes.toBytes(size);
                            put.add(putFmaily,putQ,putVal);
                            testListPut.add(put);
                              }
                        }// End for
                        
                testHTable.put(testListPut);
                        testHTable.flushCommits();
                }
               
                @Override
                public void cleanup(Context context)throws IOException{
                        if(null != testHTable){
                              testHTable.close();
                        }
                        
                        if(null != pool){
                              pool.close();
                        }
                }
      }
      
      /**
         * @param args
         * @throws IOException
         * @throws ClassNotFoundException
         * @throws InterruptedException
         */
      public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
                // TODO Auto-generated method stub
                Configuration conf = new Configuration();
      
               
                Job job = new Job(conf,"OutputHbase");
                //TableMapReduceUtil.addDependencyJars(job);
                job.setJarByClass(OutputHbase.class);
                job.setMapperClass(Map.class);
                job.setReducerClass(Reduce.class);
               
                job.setOutputKeyClass(Text.class);
                job.setOutputValueClass(Text.class);
                FileInputFormat.addInputPath(job, new Path(args));
                FileOutputFormat.setOutputPath(job, new Path(args));
                System.exit(job.waitForCompletion(true)?0:1);
      }

}
执行方式:hadoop jar /home/hadoop/dedup.jar com.duplicate.OutputHbase /home/hadoop/input/* /home/hadoop/output查看结果:两种方式一种直接在hbase客户查看,另一种是用程序直接读出来:
hbase客户端查询:scan 'TestCars','Car';java代码查询,我下面只查询了主键key值:package com.duplicate.local;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.HTablePool;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes;

public class ConnectionHbase {
        private static HTablePool pool = null;

        /**
       * @param args
       */
        public static void main(String[] args) {
                ConnectionHbase hbase = new ConnectionHbase();
                hbase.run();
        }

        public void run() {
                // TODO Auto-generated method stub
                Configuration conf = HBaseConfiguration.create();
                HTableInterface testHTable = null;
                conf.set("hbase.zookeeper.quorum", "192.168.0.101");
                pool = new HTablePool(conf, 10);

                testHTable = pool.getTable("TestCars");
                Scan scan = new Scan();
                try {
                        ResultScanner res = testHTable.getScanner(scan);
                        for(Result rs : res){
                                System.out.println(Bytes.toString(rs.getRow()));
                        }
                } catch (IOException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                }
        }

}

MatrixPlus 发表于 2017-3-3 17:13:23

言简意赅的好例子

cctcliuli 发表于 2017-9-2 09:15:20

谢谢分享
页: [1]
查看完整版本: Hadoop MapReduce统计结果直接输出hbase