立即注册 登录
About云-梭伦科技 返回首页

夜空的个人空间 https://aboutyun.com/?45784 [收藏] [复制] [分享] [RSS]

日志

mapreduce之-----HBASE大批量导入HDFS

已有 1076 次阅读2017-2-7 16:48 |个人分类:HBASE| HADOOP, MAPREDUCE, HBASE

package com.cn.count;

//com.cn.count.hbaseToHdfs
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileWriter;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.text.DecimalFormat;
import java.text.NumberFormat;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.Map.Entry;

import javax.swing.text.html.parser.Entity;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.PutSortReducer;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class hbaseToHdfs {

     public static class Mapper extends TableMapper<Text, Text> {

         @Override
         protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException,
                                                                                      InterruptedException {
             StringBuffer sb = new StringBuffer("");
             for (Entry<byte[], byte[]> entry: value.getFamilyMap("cf".getBytes()).entrySet()) {
                 String str = new String(entry.getValue());
                 if (str != null) {
                     sb.append("|");
                     sb.append(new String(entry.getKey()));
                     sb.append(":");
                     sb.append(str);

                 }

             }
             context.write(new Text(key.get()), new Text(new String(sb)));
         }
     }

     public static class reducer extends Reducer<Text, Text, Text, Text> {
         private Text result = new Text();

         @Override
         protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException,
                                                                                InterruptedException {
             for (Text val: values) {
                 result.set(val);
                 context.write(key, result);
             }
         }
     }


     private static boolean isTestCluster = true;
    public static void main(String[] args) throws Exception {
        
        String tablename = "ndbgBill";
        Configuration conf = HBaseConfiguration.create();
      
     
        Job job = new Job(conf, "WordCountHbaseReader");
        job.setJarByClass(hbaseToHdfs.class);
        
        job.getConfiguration().setInt("mapred.task.timeout", 1600000);
        job.getConfiguration().setInt("mapreduce.job.reduces", 3);
        // job.getConfiguration().setInt("mapred.reduce.parallel.copies", 5);
        // job.getConfiguration().setInt("mapred.reduce.copy.backoff", 600);

        job.getConfiguration().setInt("mapreduce.reduce.maxattempt", 8);
        job.getConfiguration().setInt("mapreduce.map.maxattempt", 8);
        job.getConfiguration().setFloat("mapreduce.reduce.shuffle.memory.limit.percent", 0.05f); // 0.05

        job.getConfiguration().setInt("mapreduce.map.memory.mb", 2048);

        job.getConfiguration().setInt("mapreduce.reduce.memory.mb", 1536);

        job.getConfiguration().set("mapreduce.map.java.opts", "-Xms512m -Xmx1024m");
        job.getConfiguration().set("mapreduce.reduce.java.opts", "-Xms512m -Xmx1024m");

        if (isTestCluster) {
            job.getConfiguration().set("hbase.zookeeper.quorum",
                "cdh-test-02.nm-cloud.internal,cdh-test-01.nm-cloud.internal,cdh-test-03.nm-cloud.internal");
        } else {
            job.getConfiguration().set("hbase.zookeeper.quorum",
                "zhpt-bd-coordinate-02.e.189.cn,zhpt-bd-coordinate-04.e.189.cn,zhpt-bd-coordinate-06.e.189.cn");
        }

        job.getConfiguration().set("hbase.zookeeper.property.clientPort", "2181");

        job.getConfiguration().set("hbase.security.authentication", "kerberos");
        job.getConfiguration().set("hbase.regionserver.kerberos.principal", "hbase/_HOST@E.189.CN");

        job.getConfiguration().set("hadoop.security.authorization", "true");
        job.getConfiguration().set("hadoop.security.authentication", "kerberos");

        job.getConfiguration().set("hbase.rpc.engine", "org.apache.hadoop.hbase.ipc.SecureRpcEngine");
        job.getConfiguration().set("hbase.security.authorization=", "true");

        job.getConfiguration().set("hbase.security.authentication", "kerberos");
        job.getConfiguration().set("hbase.regionserver.kerberos.principal", "hbase/_HOST@NM-CLOUD.INTERNAL");

        job.getConfiguration().set("hbase.master.kerberos.principal", "hbase/_HOST@NM-CLOUD.INTERNAL");
        job.getConfiguration().set("hbase.regionserver.kerberos.principal", "hbase/_HOST@NM-CLOUD.INTERNAL");

        
        
        //设置任务数据的输出路径;
        FileOutputFormat.setOutputPath(job, new Path(args[0]));
        job.setMapperClass(Mapper.class);
        job.setReducerClass(reducer.class);
        Scan scan = new Scan();
        TableMapReduceUtil.initTableMapperJob(tablename,scan,Mapper.class, Text.class, Text.class, job);
        //调用job.waitForCompletion(true) 执行任务,执行成功后退出;
        System.exit(job.waitForCompletion(true) ? 0 : 1);
        System.err.println("success ");
    }
}
 

路过

雷人

握手

鲜花

鸡蛋

评论 (0 个评论)

facelist doodle 涂鸦板

您需要登录后才可以评论 登录 | 立即注册

关闭

推荐上一条 /2 下一条