/**
* Copyright (c) 2017 21CN.COM . All rights reserved.
*
* Description: HBaseTo
*
* <pre>
* Modified log:
* ------------------------------------------------------
* Ver. Date Author Description
* ------------------------------------------------------
* 1.0 2017年1月12日 love created.
* </pre>
*/
package com.cn.count;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map.Entry;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class ReadHDFSToHbase extends Configured implements Tool {
private static HTableDescriptor htd;
public static Configuration conf;
private static boolean isTestCluster = true;
@SuppressWarnings("unused")
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
if (args.length < 1) {
System.err.println("Usage: HDFS TO HBASE <inputPath> ");
System.exit(1);
}
System.err.println("start run job .....");
try {
ToolRunner.run(new ReadHDFSToHbase(), args);
} catch (Exception e) {
e.printStackTrace();
}
System.err.println("stop run job .....");
}
public int run(String[] args) throws Exception {
// String tablename = "credit:ndbgBill";
String tablename = "ndbgBi";
conf = new Configuration();
conf.set(TableOutputFormat.OUTPUT_TABLE, tablename);
Job job = new Job(conf, "credit:ndbgBill table");
job.setJarByClass(ReadHDFSToHbase.class);
job.getConfiguration().setInt("mapreduce.reduce.maxattempt", 8);
job.getConfiguration().setInt("mapreduce.map.maxattempt", 8);
job.getConfiguration().setFloat("mapreduce.reduce.shuffle.memory.limit.percent", 0.05f); // 0.05
job.getConfiguration().setInt("mapreduce.map.memory.mb", 2048);
job.getConfiguration().setInt("mapreduce.reduce.memory.mb", 1536);
job.getConfiguration().set("mapreduce.map.java.opts", "-Xms512m -Xmx1024m");
job.getConfiguration().set("mapreduce.reduce.java.opts", "-Xms512m -Xmx1024m");
// job.setNumReduceTasks(0);
if (isTestCluster) {
job.getConfiguration().set("hbase.zookeeper.quorum",
"cdh-test-02.nm-cloud.internal,cdh-test-01.nm-cloud.internal,cdh-test-03.nm-cloud.internal");
} else {
job.getConfiguration().set("hbase.zookeeper.quorum",
"zhpt-bd-coordinate-02.e.189.cn,zhpt-bd-coordinate-04.e.189.cn,zhpt-bd-coordinate-06.e.189.cn");
}
job.getConfiguration().set("hbase.zookeeper.property.clientPort", "2181");
job.getConfiguration().set("hbase.security.authentication", "kerberos");
job.getConfiguration().set("hbase.regionserver.kerberos.principal", "hbase/_HOST@E.189.CN");
TableMapReduceUtil.initCredentials(job);
job.setMapperClass(Map.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TableOutputFormat.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
// FileInputFormat.addInputPath(job, new Path("/home/test/data/mv/part*"));
return job.waitForCompletion(true)? 0: 1;
}
public static class Map extends Mapper<LongWritable, Text, Text, Text> {
private final static IntWritable one = new IntWritable();
private static List<Put> puts = new ArrayList<Put>();
@SuppressWarnings("deprecation")
private HTableInterface table;
@SuppressWarnings("deprecation")
@Override
protected void setup(Mapper<LongWritable, Text, Text, Text>.Context context) throws IOException,
InterruptedException {
super.setup(context);
Configuration cfg = context.getConfiguration();
HBaseAdmin admin = new HBaseAdmin(cfg);
@SuppressWarnings({"deprecation", "deprecation"})
HConnection connection = HConnectionManager.createConnection(cfg);
table = connection.getTable("credit:ndbgBill");
}
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
@SuppressWarnings({"unused", "unused"})
HashMap<String, String> map = new HashMap<>();
String string = new String(value.toString());
if (string == null || string.isEmpty()) {
} else {
@SuppressWarnings("deprecation")
String keyString = null;
System.out.println(string);
String[] strings = string.split("\\|");
for (int i = 0; i < strings.length; i++) {
if (i == 0) {
keyString = strings[i].trim();
System.out.println(keyString);
} else {
String[] strings2 = strings[i].split(":");
String mkey = "", mvalue = "";
for (int j = 0; j < strings2.length; j++) {
if (j == 0) {
mkey = strings2[j];
}
if (j == 1) {
mvalue = strings2[j];
}
}
map.put(mkey, mvalue);
}
}
Put put = new Put(Bytes.toBytes(keyString));
for (Entry<String, String> s: map.entrySet()) {
// System.out.println(s.getKey() + " " + s.getValue());
put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes(s.getKey()), Bytes.toBytes(s.getValue()));
}
puts.add(put);
if (puts.size() == 200) {
table.put(puts);
puts.clear();
map.clear();
}
}
}
protected void cleanup(Context context) throws IOException, InterruptedException {
if (puts != null && puts.size() > 0) {
table.put(puts);
this.puts.clear();
}
if (table != null) {
try {
table.close();
} catch (Exception e) {
}
}
}
}
}