package com.cn.count;
import java.io.IOException;
import java.text.DecimalFormat;
import java.text.NumberFormat;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.PutSortReducer;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class hbaseReMR extends Configured implements Tool {
private static boolean isTestCluster = true;
public static class MapClass extends
TableMapper<ImmutableBytesWritable, ImmutableBytesWritable> {
// 默认的输入key value类型是<ImmutableBytesWritable,Result>,上面是输出key value类型
private static List<Put> puts = new ArrayList<Put>();
@SuppressWarnings("deprecation")
private HTableInterface table;
@SuppressWarnings("deprecation")
@Override
protected void setup(
Mapper<ImmutableBytesWritable, Result, ImmutableBytesWritable, ImmutableBytesWritable>.Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
super.setup(context);
Configuration cfg = context.getConfiguration();
@SuppressWarnings({ "deprecation", "deprecation" })
HConnection connection = HConnectionManager.createConnection(cfg);
// table = connection.getTable("ndbgBillRe");
table = connection.getTable("ndbgBill");
}
@SuppressWarnings({ "deprecation", "unused" })
public void map(ImmutableBytesWritable row, Result values,
Context context) throws IOException, InterruptedException {
String value = null;
String rowKey = null;
String prodect_name = null;
int n = 0, m = 0, l = 0;
int k = 0, k1 = 0;
float flowuse = 0.0f, flowSum = 0.0f, productUsed = 0.0f;
String averaFlow, averaVioce, averaProduct; // flow_used
float voiceused = 0.0f, voiceSum = 0.0f, productSum = 0.0f;
float fmax = 0.0f, fmin = 0.0f;
float vmax = 0.0f, vmin = 0.0f;
float pmax = 0.0f, pmin = 0.0f;
float flowOver = 0.0f, voiceOver = 0.0f;
boolean flag = true, flagv = true, flagp = true;
String fmaxmon = null, fminmon = null;
String vmaxmon = null, vminmon = null;
String pmaxmon = null, pminmon = null;
System.out.println("====================" + row);
for (KeyValue kv : values.raw()) {
String[] strings = Bytes.toString(kv.getValue()).split("\\|");
if ("cf".equals(Bytes.toString(kv.getFamily()))
&& Bytes.toString(kv.getQualifier()).contains("used")) {
rowKey = kv.toString().split("\\/")[0];// rowkey
for (int i = 0; i < strings.length; i++) {
// System.out.println(i + "-----" + strings[i]);
if (i == 0) {
flowuse = Float.valueOf(strings[i]);// flow_used
flowSum += flowuse;
n++;
if (flag) {
fmax = flowuse;
fmin = flowuse;
fmaxmon = Bytes.toString(kv.getQualifier())
.substring(0, 6);
fminmon = Bytes.toString(kv.getQualifier())
.substring(0, 6);
flag = false;
}
if (fmax < flowuse) {
fmax = flowuse;
fmaxmon = Bytes.toString(kv.getQualifier())
.substring(0, 6);
}
if (fmin > flowuse) {
fmin = flowuse;
fminmon = Bytes.toString(kv.getQualifier())
.substring(0, 6);
}
Bytes.toString(kv.getQualifier()).substring(0, 6);
// System.out.println(n+" n "+strings[i]);
}
if (i == 1) {
voiceused = Float.valueOf(strings[i]);// voice_used
voiceSum += voiceused;
m++;
if (flagv) {
vmax = voiceused;
vmin = voiceused;
vmaxmon = Bytes.toString(kv.getQualifier())
.substring(0, 6);
vminmon = Bytes.toString(kv.getQualifier())
.substring(0, 6);
flagv = false;
}
if (vmax < voiceused) {
vmax = voiceused;
vmaxmon = Bytes.toString(kv.getQualifier())
.substring(0, 6);
}
if (vmin > voiceused) {
vmin = voiceused;
vminmon = Bytes.toString(kv.getQualifier())
.substring(0, 6);
}
// System.out.println(m+" m "+strings[i]);
}
}
}
if ("cf".equals(Bytes.toString(kv.getFamily()))
&& Bytes.toString(kv.getQualifier()).contains("amount")) {
rowKey = kv.toString().split("\\/")[0];
for (int i = 0; i < strings.length; i++) {
if (i == 0) {
prodect_name = strings[i];// protect_name
// System.out.println(prodect_name+" "+strings[i]);
}
if (i == 1) {
productUsed = Float.valueOf(strings[i]);// product_used
productSum += productUsed;
l++;
if (flagp) {
pmax = productUsed;
pmin = productUsed;
pmaxmon = Bytes.toString(kv.getQualifier())
.substring(0, 6);
pminmon = Bytes.toString(kv.getQualifier())
.substring(0, 6);
flagp = false;
}
if (pmax < productUsed) {
pmax = productUsed;
pmaxmon = Bytes.toString(kv.getQualifier())
.substring(0, 6);
}
if (pmin > productUsed) {
pmin = productUsed;
pmaxmon = Bytes.toString(kv.getQualifier())
.substring(0, 6);
}
// System.out.println(productSum+" "+l+" "+strings[i]);
}
if (i == 2) {
k+= Integer.valueOf(strings[i]);// flowOver
}
if (i == 3) {
k1+= Integer.valueOf(strings[i]);// voiceOver
}
}
// System.out
// .println("value:" + Bytes.toString(kv.getValue()));
}
}
DecimalFormat dFormat = new DecimalFormat("###.00");
averaFlow = dFormat.format(flowSum/n);
averaVioce = dFormat.format(voiceSum/m);
averaProduct = dFormat.format(productSum/l);
Put put = new Put(rowKey.getBytes());
if(fmaxmon==null){
}else{
String fls=String.valueOf(flowSum);
String fmm=String.valueOf(fmaxmon);
String fmf=String.valueOf(fmax);
put.add(Bytes.toBytes("cf"), Bytes.toBytes("flow"),
Bytes.toBytes(fls));
put.add(Bytes.toBytes("cf"), Bytes.toBytes("month_max_flow"),
Bytes.toBytes(fmm));
put.add(Bytes.toBytes("cf"), Bytes.toBytes("max_flow"),
Bytes.toBytes(fmf));
}
if(vmaxmon==null){
}else{
String fvs=String.valueOf(voiceSum);
String vmm=String.valueOf(vmaxmon);
String vms=String.valueOf(vmax);
put.add(Bytes.toBytes("cf"), Bytes.toBytes("voice"),
Bytes.toBytes(fvs));
put.add(Bytes.toBytes("cf"), Bytes.toBytes("month_max_voice"),
Bytes.toBytes(vmm));
put.add(Bytes.toBytes("cf"), Bytes.toBytes("max_voice"),
Bytes.toBytes(vms));
}
if(pmaxmon==null){
}else{
String prs=String.valueOf(productSum);
String pmx=String.valueOf(pmax);
put.add(Bytes.toBytes("cf"), Bytes.toBytes("averageMonthFee"),
Bytes.toBytes(averaProduct));
put.add(Bytes.toBytes("cf"), Bytes.toBytes("MonthFee"),
Bytes.toBytes(prs));
put.add(Bytes.toBytes("cf"), Bytes.toBytes("month_max_fee"),
Bytes.toBytes(pmaxmon));
put.add(Bytes.toBytes("cf"), Bytes.toBytes("max_fee"),
Bytes.toBytes(pmx));
}
if(prodect_name==null){
}else{
String fp=String.valueOf(k);
String vp=String.valueOf(k1);
put.add(Bytes.toBytes("cf"), Bytes.toBytes("product_name"),
Bytes.toBytes(prodect_name));
put.add(Bytes.toBytes("cf"), Bytes.toBytes("flow_super"),
Bytes.toBytes(fp));
put.add(Bytes.toBytes("cf"), Bytes.toBytes("voice_super"),
Bytes.toBytes(vp));
}
// table.put(put);
puts.add(put);
if (puts.size() == 200) {
table.put(puts);
puts.clear();
}
}
protected void cleanup(Context context) throws IOException,
InterruptedException {
if (puts != null && puts.size() > 0) {
table.put(puts);
this.puts.clear();
}
if (table != null) {
try {
table.close();
} catch (Exception e) {
}
}
}
}
// 在提交作业时设置inputFormat为TableInputFormat,设置outputFormat为TableOutputFormat,可以借助TableMapReduceUtil类来简化编码。
@Override
public int run(String[] args) throws Exception {
String tablename = "ndbgBil";
String targettablename = "ndbgBill";
Configuration conf = new Configuration();
conf.set(TableInputFormat.INPUT_TABLE, tablename);
conf.set(TableOutputFormat.OUTPUT_TABLE, targettablename);
conf.setInt("hbase.client.scanner.caching",1000);
Job job = new Job(conf, "HBaseBill mapreduce");
job.getConfiguration().setInt("mapreduce.reduce.maxattempt", 8);
job.getConfiguration().setInt("mapreduce.map.maxattempt", 8);
job.getConfiguration().setFloat(
"mapreduce.reduce.shuffle.memory.limit.percent", 0.05f); // 0.05
job.getConfiguration().setInt("mapreduce.map.memory.mb", 2048);
job.getConfiguration().setInt("mapreduce.reduce.memory.mb", 1536);
job.getConfiguration().set("mapreduce.map.java.opts",
"-Xms512m -Xmx1024m");
job.getConfiguration().set("mapreduce.reduce.java.opts",
"-Xms512m -Xmx1024m");
if (isTestCluster) {
job.getConfiguration()
.set("hbase.zookeeper.quorum",
"cdh-test-02.nm-cloud.internal,cdh-test-01.nm-cloud.internal,cdh-test-03.nm-cloud.internal");
} else {
job.getConfiguration()
.set("hbase.zookeeper.quorum",
"zhpt-bd-coordinate-02.e.189.cn,zhpt-bd-coordinate-04.e.189.cn,zhpt-bd-coordinate-06.e.189.cn");
}
job.getConfiguration().set("hbase.zookeeper.property.clientPort",
"2181");
job.getConfiguration().set("hbase.rootdir","hdfs://21cnCDHTest/hbase");
job.getConfiguration().set("hbase.security.authentication", "kerberos");
job.getConfiguration().set("hbase.regionserver.kerberos.principal","hbase/_HOST@E.189.CN");
job.getConfiguration().set("hadoop.security.authorization", "true");
job.getConfiguration().set("hadoop.security.authentication", "kerberos");
job.getConfiguration().set("hbase.rpc.engine", "org.apache.hadoop.hbase.ipc.SecureRpcEngine");
job.getConfiguration().set("hbase.security.authorization=", "true");
job.getConfiguration().set("hbase.security.authentication", "kerberos");
job.getConfiguration().set("hbase.regionserver.kerberos.principal","hbase/_HOST@NM-CLOUD.INTERNAL");
job.getConfiguration().set("hbase.master.kerberos.principal", "hbase/_HOST@NM-CLOUD.INTERNAL");
job.getConfiguration().set("hbase.regionserver.kerberos.principal", "hbase/_HOST@NM-CLOUD.INTERNAL");
job.getConfiguration().setInt("hbase.client.scanner.caching",1000);
job.setJarByClass(hbaseReMR.class);
job.setMapperClass(MapClass.class);
// job.setInputFormatClass(TableInputFormat.class);
// job.setOutputFormatClass(TableOutputFormat.class);
// job.setOutputKeyClass(ImmutableBytesWritable.class);
// job.setOutputValueClass(ImmutableBytesWritable.class);
Scan scan = new Scan();
scan.setMaxVersions(1);
scan.setCaching(1000);
scan.setCacheBlocks(false);
TableMapReduceUtil.initTableMapperJob(tablename,
scan, // scan将被传递给TableInputFormat,并限制MapClass所能看到的信息
MapClass.class, ImmutableBytesWritable.class,
ImmutableBytesWritable.class, job);
TableMapReduceUtil.initTableReducerJob(targettablename,
null,
job);
TableMapReduceUtil.initCredentials(job);
System.exit(job.waitForCompletion(true) ? 0 : 1);
return 0;
}
public static void main(String[] args) throws Exception {
System.err.println("start run job .....");
int res = ToolRunner.run(new Configuration(), new hbaseReMR(), args);
System.exit(res);
// ToolRunner.run(new hbaseRe(), args);
System.err.println("start run job .....");
}
}