立即注册 登录
About云-梭伦科技 返回首页

夜空的个人空间 https://aboutyun.com/?45784 [收藏] [复制] [分享] [RSS]

日志

mapreduce之-----HDFS大批量导入HBASE(map)

已有 907 次阅读2017-1-8 09:51 |个人分类:HBASE| MAP, HBASE, MAPREDUCE

场景:读取HDFS文件,做处理,写入HBASE
命令:hadoop jar NdbgMR.jar com.cn.count.HBaseMR 201602 /user/***/part-*

package com.cn.count;
/**
 * Copyright (c) 2016***. All rights reserved.
 * 
 * Description: ndbgBill
 * 
 * <pre>
 * Modified log:
 * ------------------------------------------------------
 * Ver. Date Author Description
 * ------------------------------------------------------
 * 1.0 2016年12月20日 zgr created.
 * </pre>
 */
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class BillToOne extends Configured implements Tool {

private static HTableDescriptor htd;
private static String C_dateMOnth = "c_month";
// public static Configuration conf = new Configuration();
private static boolean isTestCluster = true;

public static void main(String[] args) throws IOException,
InterruptedException, ClassNotFoundException {
// TODO Auto-generated method stub
if (args.length < 2) {
System.err.println("Usage: BillToHbase <dataMonth> <pathin>");
System.exit(1);
}

System.err.println("start run job .....");
try {
ToolRunner.run(new BillToOne(), args);
} catch (Exception e) {
e.printStackTrace();
}
System.err.println("stop run job .....");
}

public int run(String[] args) throws Exception {
String tablename = "ndbgBill";
Configuration conf = new Configuration();
conf.set(TableOutputFormat.OUTPUT_TABLE, tablename);
conf.set(C_dateMOnth, args[0]);
Job job = new Job(conf, "ndbgBill table");
job.setJarByClass(BillToOne.class);
// job.setNumReduceTasks(3);
// conf.set(C_dateMOnth, "201606");
System.out.println("%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%"+args[0]);

job.getConfiguration().setInt("mapreduce.reduce.maxattempt", 8);
job.getConfiguration().setInt("mapreduce.map.maxattempt", 8);
job.getConfiguration().setFloat(
"mapreduce.reduce.shuffle.memory.limit.percent", 0.05f); // 0.05

job.getConfiguration().setInt("mapreduce.map.memory.mb", 2048);

job.getConfiguration().setInt("mapreduce.reduce.memory.mb", 1536);

job.getConfiguration().set("mapreduce.map.java.opts",
"-Xms512m -Xmx1024m");
job.getConfiguration().set("mapreduce.reduce.java.opts",
"-Xms512m -Xmx1024m");

// job.setNumReduceTasks(0);

if (isTestCluster) {
job.getConfiguration()
.set("hbase.zookeeper.quorum",
"cdh-test-02.nm-cloud.internal,cdh-test-01.nm-cloud.internal,cdh-test-03.nm-cloud.internal");
} else {
job.getConfiguration()
.set("hbase.zookeeper.quorum",
"zhpt-bd-coordinate-02.e.189.cn,zhpt-bd-coordinate-04.e.189.cn,zhpt-bd-coordinate-06.e.189.cn");
}

job.getConfiguration().set("hbase.zookeeper.property.clientPort",
"2181");

job.getConfiguration().set("hbase.security.authentication", "kerberos");
job.getConfiguration().set("hbase.regionserver.kerberos.principal",
"hbase/_HOST@E.189.CN");

TableMapReduceUtil.initCredentials(job);
// createHBaseTable(tablename);

job.setMapperClass(Map.class);
// job.setReducerClass(Reduce.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TableOutputFormat.class);
// job.setOutputFormatClass(NullOutputFormat.class);
// 设置输入目录
FileInputFormat.addInputPath(job, new Path(args[1]));
// FileInputFormat.addInputPath(job, new Path("hdfs://centos:9000/user/dataA/input3/*")); 

return job.waitForCompletion(true) ? 0 : 1;
}

public static class Map extends Mapper<LongWritable, Text, Text, Text> {
private final static IntWritable one = new IntWritable();

private static List<Put> puts = new ArrayList<Put>();

private HTableInterface table;

private String dateMOnth;

@SuppressWarnings("deprecation")
@Override
protected void setup(
Mapper<LongWritable, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
super.setup(context);
Configuration cfg = context.getConfiguration();
HBaseAdmin admin=new HBaseAdmin(cfg);
@SuppressWarnings({ "deprecation", "deprecation" })
HConnection connection = HConnectionManager.createConnection(cfg);
table = connection.getTable("ndbgBill");
this.table.setWriteBufferSize(200); // 设置writebuffersize,当write
// buffer写满的时候才会flush
this.table.setAutoFlushTo(false); // 禁止自动刷写

// dateMOnth = conf.get(C_dateMOnth);
dateMOnth = cfg.get(C_dateMOnth);
System.out.println("datamonth======================="+dateMOnth);

}

public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String s = value.toString();
System.out.println(s+"++*********************************************");

String[] strings = s.split("\\|");

if (strings == null || strings.length == 0) {
return;
}
// judge the input file is amount_bill or used_bill
if (strings.length == 6) {// amount_bil
StringBuilder sb = new StringBuilder();
for (int i = 2; i < strings.length; i++) {
if (strings[i].trim().equals("")) {
}else if(i==strings.length-1){
sb.append(strings[i] );
}else{
sb.append(strings[i] + "|");
}
}
String st = new String(sb);

String rowkey = reverse(strings[1]) + dateMOnth;

Put put = new Put(Bytes.toBytes(rowkey));
put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("used_bil"),
Bytes.toBytes(st));
if(put.isEmpty()){
System.err.println("1  put is empty");
}
// context.write(new Text(rowkey),new Text(st));
puts.add(put);
table.put(put);
if (puts.size() == 200) {
table.put(puts);
puts.clear();
}
} else if (strings.length == 14) {
StringBuilder sb = new StringBuilder();
for (int i = 1; i < strings.length; i++) {
if (strings[i].trim().equals("")) {
}else if(i==strings.length-1){
sb.append(strings[i] );
}else{
sb.append(strings[i] + "|");
}
}
System.out.println(sb.toString()+"=============");
String st = new String(sb);
String rowkey = reverse(strings[0]) + dateMOnth;

Put put = new Put(Bytes.toBytes(rowkey));
put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("amount_bil"),
Bytes.toBytes(st));
table.put(put);
if(put.isEmpty()){
System.err.println("1  put is empty");
}
puts.add(put);
if (puts.size() == 200) {
table.put(puts);
puts.clear();
}
}
}
protected void cleanup(Context context) throws IOException,
InterruptedException {
if (puts != null && puts.size() > 0) {
table.put(puts);
this.puts.clear();
}
if (table != null) {
try {
table.close();
} catch (Exception e) {
}
}
}

public static String reverse(String string) {

String reve = "";
for (int i = 0; i < string.length(); i++) {
reve += string.charAt(string.length() - 1 - i);
}
return reve.toString();
}
}
public static void  createHBaseTable(String tablename)
throws IOException {

htd = new HTableDescriptor(tablename);
// create a family 'used_bil' to write used_bill data
HColumnDescriptor col = new HColumnDescriptor("used_bil");
// create a family 'amount_bil' to write amount_bill data
HColumnDescriptor coll = new HColumnDescriptor("amount_bil");

// fill the family to table
htd.addFamily(col);
htd.addFamily(coll);
Configuration cfg = HBaseConfiguration.create();
@SuppressWarnings("resource")
HBaseAdmin admin = new HBaseAdmin(cfg);

if (admin.tableExists(tablename)) {
System.out.println("table exists,trying recreate table!");
// admin.disableTable(tablename);
// admin.deleteTable(tablename);
// admin.createTable(htd);
}else {
System.out.println("create new table:" + tablename);
admin.createTable(htd);
}
}

// @SuppressWarnings("deprecation")
// public static HTableInterface getHtable() throws IOException{
// Configuration cfg = HBaseConfiguration.create();
//
//
// HConnection connection = HConnectionManager.createConnection(cfg);
//
// HTableInterface table = connection.getTable("ndbg_bill");
//
// return table;
// }
public static HTableInterface getHtable() throws IOException{
Configuration cfg = HBaseConfiguration.create();  
         
       
HConnection connection = HConnectionManager.createConnection(cfg);
   
HTableInterface table = connection.getTable("ndbg_bill");
   
    return table;
}
}

路过

雷人

握手

鲜花

鸡蛋

评论 (0 个评论)

facelist doodle 涂鸦板

您需要登录后才可以评论 登录 | 立即注册

关闭

推荐上一条 /2 下一条