立即注册 登录
About云-梭伦科技 返回首页

夜空的个人空间 https://aboutyun.com/?45784 [收藏] [复制] [分享] [RSS]

日志

mapreduce之-----HDFS大批量导入HBASE(map)

已有 866 次阅读2017-2-7 16:46 |个人分类:HBASE| HBASE, MAP, HDFS

/**
 * Copyright (c) 2017 21CN.COM . All rights reserved.
 *
 * Description: HBaseTo
 * 
 * <pre>
 * Modified log:
 * ------------------------------------------------------
 * Ver. Date Author Description
 * ------------------------------------------------------
 * 1.0 2017年1月12日 love created.
 * </pre>
 */
package com.cn.count;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map.Entry;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class ReadHDFSToHbase extends Configured implements Tool {

    private static HTableDescriptor htd;

    public static Configuration conf;

    private static boolean isTestCluster = true;

    @SuppressWarnings("unused")
    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {

        if (args.length < 1) {
            System.err.println("Usage: HDFS TO HBASE  <inputPath> ");
            System.exit(1);
        }

        System.err.println("start run job .....");
        try {
            ToolRunner.run(new ReadHDFSToHbase(), args);
        } catch (Exception e) {
            e.printStackTrace();
        }
        System.err.println("stop run job .....");
    }

    public int run(String[] args) throws Exception {
//        String tablename = "credit:ndbgBill";
        String tablename = "ndbgBi";
        conf = new Configuration();
        conf.set(TableOutputFormat.OUTPUT_TABLE, tablename);

        Job job = new Job(conf, "credit:ndbgBill table");
        job.setJarByClass(ReadHDFSToHbase.class);

        job.getConfiguration().setInt("mapreduce.reduce.maxattempt", 8);
        job.getConfiguration().setInt("mapreduce.map.maxattempt", 8);
        job.getConfiguration().setFloat("mapreduce.reduce.shuffle.memory.limit.percent", 0.05f); // 0.05

        job.getConfiguration().setInt("mapreduce.map.memory.mb", 2048);

        job.getConfiguration().setInt("mapreduce.reduce.memory.mb", 1536);

        job.getConfiguration().set("mapreduce.map.java.opts", "-Xms512m -Xmx1024m");
        job.getConfiguration().set("mapreduce.reduce.java.opts", "-Xms512m -Xmx1024m");

        // job.setNumReduceTasks(0);

        if (isTestCluster) {
            job.getConfiguration().set("hbase.zookeeper.quorum",
                "cdh-test-02.nm-cloud.internal,cdh-test-01.nm-cloud.internal,cdh-test-03.nm-cloud.internal");
        } else {
            job.getConfiguration().set("hbase.zookeeper.quorum",
                "zhpt-bd-coordinate-02.e.189.cn,zhpt-bd-coordinate-04.e.189.cn,zhpt-bd-coordinate-06.e.189.cn");
        }

        job.getConfiguration().set("hbase.zookeeper.property.clientPort", "2181");

        job.getConfiguration().set("hbase.security.authentication", "kerberos");
        job.getConfiguration().set("hbase.regionserver.kerberos.principal", "hbase/_HOST@E.189.CN");

        TableMapReduceUtil.initCredentials(job);

        job.setMapperClass(Map.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
        job.setInputFormatClass(TextInputFormat.class);

        job.setOutputFormatClass(TableOutputFormat.class);

        FileInputFormat.addInputPath(job, new Path(args[0]));
        // FileInputFormat.addInputPath(job, new Path("/home/test/data/mv/part*"));

        return job.waitForCompletion(true)? 0: 1;
    }

    public static class Map extends Mapper<LongWritable, Text, Text, Text> {
        private final static IntWritable one = new IntWritable();

        private static List<Put> puts = new ArrayList<Put>();

        @SuppressWarnings("deprecation")
        private HTableInterface table;

        @SuppressWarnings("deprecation")
        @Override
        protected void setup(Mapper<LongWritable, Text, Text, Text>.Context context) throws IOException,
                                                                                    InterruptedException {
            super.setup(context);

            Configuration cfg = context.getConfiguration();

            HBaseAdmin admin = new HBaseAdmin(cfg);
            @SuppressWarnings({"deprecation", "deprecation"})
            HConnection connection = HConnectionManager.createConnection(cfg);
            table = connection.getTable("credit:ndbgBill");
        }

        public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            @SuppressWarnings({"unused", "unused"})
            HashMap<String, String> map = new HashMap<>();
            String string = new String(value.toString());
            if (string == null || string.isEmpty()) {

            } else {
                @SuppressWarnings("deprecation")
                String keyString = null;
                System.out.println(string);
                String[] strings = string.split("\\|");
                for (int i = 0; i < strings.length; i++) {
                    if (i == 0) {
                        keyString = strings[i].trim();
                        System.out.println(keyString);

                    } else {
                        String[] strings2 = strings[i].split(":");
                        String mkey = "", mvalue = "";
                        for (int j = 0; j < strings2.length; j++) {
                            if (j == 0) {
                                mkey = strings2[j];
                            }
                            if (j == 1) {
                                mvalue = strings2[j];
                            }
                        }
                        map.put(mkey, mvalue);
                    }
                }
                Put put = new Put(Bytes.toBytes(keyString));
                for (Entry<String, String> s: map.entrySet()) {
                    // System.out.println(s.getKey() + "    " + s.getValue());
                    put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes(s.getKey()), Bytes.toBytes(s.getValue()));
                }
                puts.add(put);
                if (puts.size() == 200) {
                    table.put(puts);
                    puts.clear();
                    map.clear();
                }
            }
        }

        protected void cleanup(Context context) throws IOException, InterruptedException {
            if (puts != null && puts.size() > 0) {
                table.put(puts);
                this.puts.clear();
            }
            if (table != null) {
                try {
                    table.close();
                } catch (Exception e) {

                }
            }
        }
    }

}


路过

雷人

握手

鲜花

鸡蛋

评论 (0 个评论)

facelist doodle 涂鸦板

您需要登录后才可以评论 登录 | 立即注册

关闭

推荐上一条 /2 下一条