场景:读取HDFS文件,做处理,写入HBASE
命令:hadoop jar NdbgMR.jar com.cn.count.HBaseMR 201602 /user/***/part-*
package com.cn.count;
/**
* Copyright (c) 2016***. All rights reserved.
*
* Description: ndbgBill
*
* <pre>
* Modified log:
* ------------------------------------------------------
* Ver. Date Author Description
* ------------------------------------------------------
* 1.0 2016年12月20日 zgr created.
* </pre>
*/
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class BillToOne extends Configured implements Tool {
private static HTableDescriptor htd;
private static String C_dateMOnth = "c_month";
// public static Configuration conf = new Configuration();
private static boolean isTestCluster = true;
public static void main(String[] args) throws IOException,
InterruptedException, ClassNotFoundException {
// TODO Auto-generated method stub
if (args.length < 2) {
System.err.println("Usage: BillToHbase <dataMonth> <pathin>");
System.exit(1);
}
System.err.println("start run job .....");
try {
ToolRunner.run(new BillToOne(), args);
} catch (Exception e) {
e.printStackTrace();
}
System.err.println("stop run job .....");
}
public int run(String[] args) throws Exception {
String tablename = "ndbgBill";
Configuration conf = new Configuration();
conf.set(TableOutputFormat.OUTPUT_TABLE, tablename);
conf.set(C_dateMOnth, args[0]);
Job job = new Job(conf, "ndbgBill table");
job.setJarByClass(BillToOne.class);
// job.setNumReduceTasks(3);
// conf.set(C_dateMOnth, "201606");
System.out.println("%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%"+args[0]);
job.getConfiguration().setInt("mapreduce.reduce.maxattempt", 8);
job.getConfiguration().setInt("mapreduce.map.maxattempt", 8);
job.getConfiguration().setFloat(
"mapreduce.reduce.shuffle.memory.limit.percent", 0.05f); // 0.05
job.getConfiguration().setInt("mapreduce.map.memory.mb", 2048);
job.getConfiguration().setInt("mapreduce.reduce.memory.mb", 1536);
job.getConfiguration().set("mapreduce.map.java.opts",
"-Xms512m -Xmx1024m");
job.getConfiguration().set("mapreduce.reduce.java.opts",
"-Xms512m -Xmx1024m");
// job.setNumReduceTasks(0);
if (isTestCluster) {
job.getConfiguration()
.set("hbase.zookeeper.quorum",
"cdh-test-02.nm-cloud.internal,cdh-test-01.nm-cloud.internal,cdh-test-03.nm-cloud.internal");
} else {
job.getConfiguration()
.set("hbase.zookeeper.quorum",
"zhpt-bd-coordinate-02.e.189.cn,zhpt-bd-coordinate-04.e.189.cn,zhpt-bd-coordinate-06.e.189.cn");
}
job.getConfiguration().set("hbase.zookeeper.property.clientPort",
"2181");
job.getConfiguration().set("hbase.security.authentication", "kerberos");
job.getConfiguration().set("hbase.regionserver.kerberos.principal",
"hbase/_HOST@E.189.CN");
TableMapReduceUtil.initCredentials(job);
// createHBaseTable(tablename);
job.setMapperClass(Map.class);
// job.setReducerClass(Reduce.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TableOutputFormat.class);
// job.setOutputFormatClass(NullOutputFormat.class);
// 设置输入目录
FileInputFormat.addInputPath(job, new Path(args[1]));
// FileInputFormat.addInputPath(job, new Path("hdfs://centos:9000/user/dataA/input3/*"));
return job.waitForCompletion(true) ? 0 : 1;
}
public static class Map extends Mapper<LongWritable, Text, Text, Text> {
private final static IntWritable one = new IntWritable();
private static List<Put> puts = new ArrayList<Put>();
private HTableInterface table;
private String dateMOnth;
@SuppressWarnings("deprecation")
@Override
protected void setup(
Mapper<LongWritable, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
super.setup(context);
Configuration cfg = context.getConfiguration();
HBaseAdmin admin=new HBaseAdmin(cfg);
@SuppressWarnings({ "deprecation", "deprecation" })
HConnection connection = HConnectionManager.createConnection(cfg);
table = connection.getTable("ndbgBill");
this.table.setWriteBufferSize(200); // 设置writebuffersize,当write
// buffer写满的时候才会flush
this.table.setAutoFlushTo(false); // 禁止自动刷写
// dateMOnth = conf.get(C_dateMOnth);
dateMOnth = cfg.get(C_dateMOnth);
System.out.println("datamonth======================="+dateMOnth);
}
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String s = value.toString();
System.out.println(s+"++*********************************************");
String[] strings = s.split("\\|");
if (strings == null || strings.length == 0) {
return;
}
// judge the input file is amount_bill or used_bill
if (strings.length == 6) {// amount_bil
StringBuilder sb = new StringBuilder();
for (int i = 2; i < strings.length; i++) {
if (strings[i].trim().equals("")) {
}else if(i==strings.length-1){
sb.append(strings[i] );
}else{
sb.append(strings[i] + "|");
}
}
String st = new String(sb);
String rowkey = reverse(strings[1]) + dateMOnth;
Put put = new Put(Bytes.toBytes(rowkey));
put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("used_bil"),
Bytes.toBytes(st));
if(put.isEmpty()){
System.err.println("1 put is empty");
}
// context.write(new Text(rowkey),new Text(st));
puts.add(put);
table.put(put);
if (puts.size() == 200) {
table.put(puts);
puts.clear();
}
} else if (strings.length == 14) {
StringBuilder sb = new StringBuilder();
for (int i = 1; i < strings.length; i++) {
if (strings[i].trim().equals("")) {
}else if(i==strings.length-1){
sb.append(strings[i] );
}else{
sb.append(strings[i] + "|");
}
}
System.out.println(sb.toString()+"=============");
String st = new String(sb);
String rowkey = reverse(strings[0]) + dateMOnth;
Put put = new Put(Bytes.toBytes(rowkey));
put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("amount_bil"),
Bytes.toBytes(st));
table.put(put);
if(put.isEmpty()){
System.err.println("1 put is empty");
}
puts.add(put);
if (puts.size() == 200) {
table.put(puts);
puts.clear();
}
}
}
protected void cleanup(Context context) throws IOException,
InterruptedException {
if (puts != null && puts.size() > 0) {
table.put(puts);
this.puts.clear();
}
if (table != null) {
try {
table.close();
} catch (Exception e) {
}
}
}
public static String reverse(String string) {
String reve = "";
for (int i = 0; i < string.length(); i++) {
reve += string.charAt(string.length() - 1 - i);
}
return reve.toString();
}
}
public static void createHBaseTable(String tablename)
throws IOException {
htd = new HTableDescriptor(tablename);
// create a family 'used_bil' to write used_bill data
HColumnDescriptor col = new HColumnDescriptor("used_bil");
// create a family 'amount_bil' to write amount_bill data
HColumnDescriptor coll = new HColumnDescriptor("amount_bil");
// fill the family to table
htd.addFamily(col);
htd.addFamily(coll);
Configuration cfg = HBaseConfiguration.create();
@SuppressWarnings("resource")
HBaseAdmin admin = new HBaseAdmin(cfg);
if (admin.tableExists(tablename)) {
System.out.println("table exists,trying recreate table!");
// admin.disableTable(tablename);
// admin.deleteTable(tablename);
// admin.createTable(htd);
}else {
System.out.println("create new table:" + tablename);
admin.createTable(htd);
}
}
// @SuppressWarnings("deprecation")
// public static HTableInterface getHtable() throws IOException{
// Configuration cfg = HBaseConfiguration.create();
//
//
// HConnection connection = HConnectionManager.createConnection(cfg);
//
// HTableInterface table = connection.getTable("ndbg_bill");
//
// return table;
// }
public static HTableInterface getHtable() throws IOException{
Configuration cfg = HBaseConfiguration.create();
HConnection connection = HConnectionManager.createConnection(cfg);
HTableInterface table = connection.getTable("ndbg_bill");
return table;
}
}