/**
* Copyright (c) 2017 21CN.COM . All rights reserved.
*
* Description: HBaseTo
*
* <pre>
* Modified log:
* ------------------------------------------------------
* Ver. Date Author Description
* ------------------------------------------------------
* 1.0 2017年1月17日 love created.
* </pre>
*/
package com.BillCount;
import java.io.IOException;
import java.util.Iterator;
import java.util.TreeSet;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import com.google.common.base.Strings;
public class BillwordCount {
public static class Map extends Mapper<Object, Text, Text, IntWritable> {
private static Text line = new Text();// 每行数据
private static IntWritable vlue = new IntWritable(1);
private Text ky1 = new Text();
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
FileSplit fileSplit = (FileSplit) context.getInputSplit();
Path path = fileSplit.getPath();
String filename = fileSplit.getPath().getParent().getName();
// System.out.println("fileName:" + filename);
// System.out.println("Path:" + path.toString());
ky1.set(path.toString());
if (Strings.nullToEmpty(line.trim().toString()) != null) {
if (line.toString().contains("宽带")) {
context.write(ky1, vlue);
}
}
}
}
public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable sum = new IntWritable(1);
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int total = 0;
for (IntWritable val: values) {
total += val.get();
}
sum.set(total);
context.write(key, sum);
System.out.println(key + "==========" + sum);
}
}
private static TreeSet<String> aList = new TreeSet<String>();
private static boolean isTestCluster = false;
public static void main(String[] args) throws Exception {
if (args.length != 2) {
System.err.println("Usage: Data count <Billin> <Resultout>");
System.exit(2);
}
Configuration conf = new Configuration();
Job job = new Job(conf, "Data count");
job.getConfiguration().setInt("mapreduce.map.maxattempt", 8);
job.getConfiguration().setFloat("mapreduce.reduce.shuffle.memory.limit.percent", 0.05f); // 0.05
job.getConfiguration().setInt("mapreduce.map.memory.mb", 2048);
job.getConfiguration().set("mapreduce.map.java.opts", "-Xms512m -Xmx1024m");
job.getConfiguration().set("mapreduce.reduce.java.opts", "-Xms512m -Xmx1024m");
job.setJarByClass(BillwordCount.class);
// 设置Map、Combine和Reduce处理类
job.setMapperClass(Map.class);
job.setCombinerClass(Reduce.class);
job.setReducerClass(Reduce.class);
// 设置输出类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 设置输入和输出目录
FileSystem hdfs = null;
// Path path = new Path("/home/test/data/data1/bill");
Path path = new Path(args[0]);
hdfs = FileSystem.get(conf);
TreeSet<String> treeSet = iteratorShowFiles(hdfs, path);
Iterator<String> iterator = treeSet.iterator();
while (iterator.hasNext()) {
String wordPath = null;
wordPath = iterator.next();
FileInputFormat.addInputPath(job, new Path(wordPath));
}
// FileOutputFormat.setOutputPath(job, new Path("/home/test/data/data1/aa.txt"));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true)? 0: 1);
}
public static TreeSet<String> iteratorShowFiles(FileSystem hdfs, Path path) {
try {
if (hdfs == null || path == null) {
}
// 获取文件列表
FileStatus[] files = hdfs.listStatus(path);
// 展示文件信息
for (int i = 0; i < files.length; i++) {
try {
if (files[i].isDirectory()) {
iteratorShowFiles(hdfs, files[i].getPath());
} else if (files[i].isFile()) {
String a = files[i].getPath().toString().trim();
aList.add(a);
}
} catch (Exception e) {
e.printStackTrace();
}
}
} catch (Exception e) {
e.printStackTrace();
}
return aList;
}
}