立即注册 登录
About云-梭伦科技 返回首页

夜空的个人空间 https://aboutyun.com/?45784 [收藏] [复制] [分享] [RSS]

日志

Mapreduce之计算取值读取多层路径

热度 1已有 1138 次阅读2017-1-17 14:58 |个人分类:mapreduce| mapreduce, 读取多层文件

代码:读取多层文件,有些文件可能为空,统计不为空文件中特定用户的个数
/**
 * Copyright (c) 2017 21CN.COM . All rights reserved.
 *
 * Description: HBaseTo
 * 
 * <pre>
 * Modified log:
 * ------------------------------------------------------
 * Ver. Date Author Description
 * ------------------------------------------------------
 * 1.0 2017年1月17日 love created.
 * </pre>
 */
package com.BillCount;

import java.io.IOException;
import java.util.Iterator;
import java.util.TreeSet;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import com.google.common.base.Strings;

public class BillwordCount {

    public static class Map extends Mapper<Object, Text, Text, IntWritable> {

        private static Text line = new Text();// 每行数据

        private static IntWritable vlue = new IntWritable(1);

        private Text ky1 = new Text();

        public void map(Object key, Text value, Context context)

        throws IOException, InterruptedException {

            String line = value.toString();

            FileSplit fileSplit = (FileSplit) context.getInputSplit();
            Path path = fileSplit.getPath();
            String filename = fileSplit.getPath().getParent().getName();
            // System.out.println("fileName:" + filename);
            // System.out.println("Path:" + path.toString());

            ky1.set(path.toString());
            if (Strings.nullToEmpty(line.trim().toString()) != null) {
                if (line.toString().contains("宽带")) {
                    context.write(ky1, vlue);
                }
            }
        }

    }

    public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable> {

        private IntWritable sum = new IntWritable(1);

        public void reduce(Text key, Iterable<IntWritable> values, Context context)

        throws IOException, InterruptedException {

            int total = 0;
            for (IntWritable val: values) {
                total += val.get();
            }
            sum.set(total);
            context.write(key, sum);
            System.out.println(key + "==========" + sum);
        }
    }

    private static TreeSet<String> aList = new TreeSet<String>();

    private static boolean isTestCluster = false;

    public static void main(String[] args) throws Exception {

        if (args.length != 2) {

            System.err.println("Usage: Data count   <Billin> <Resultout>");

            System.exit(2);

        }
        
        Configuration conf = new Configuration();
        Job job = new Job(conf, "Data count");

        job.getConfiguration().setInt("mapreduce.map.maxattempt", 8);
        job.getConfiguration().setFloat("mapreduce.reduce.shuffle.memory.limit.percent", 0.05f); // 0.05

        job.getConfiguration().setInt("mapreduce.map.memory.mb", 2048);

        job.getConfiguration().set("mapreduce.map.java.opts", "-Xms512m -Xmx1024m");
        job.getConfiguration().set("mapreduce.reduce.java.opts", "-Xms512m -Xmx1024m");
        job.setJarByClass(BillwordCount.class);

        // 设置Map、Combine和Reduce处理类
        job.setMapperClass(Map.class);
        job.setCombinerClass(Reduce.class);
        job.setReducerClass(Reduce.class);

        // 设置输出类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        // 设置输入和输出目录
        FileSystem hdfs = null;
        // Path path = new Path("/home/test/data/data1/bill");
        Path path = new Path(args[0]);
        hdfs = FileSystem.get(conf);
        TreeSet<String> treeSet = iteratorShowFiles(hdfs, path);

        Iterator<String> iterator = treeSet.iterator();
        while (iterator.hasNext()) {
            String wordPath = null;
            wordPath = iterator.next();
            FileInputFormat.addInputPath(job, new Path(wordPath));
        }

        // FileOutputFormat.setOutputPath(job, new Path("/home/test/data/data1/aa.txt"));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        System.exit(job.waitForCompletion(true)? 0: 1);
    }

    public static TreeSet<String> iteratorShowFiles(FileSystem hdfs, Path path) {

        try {
            if (hdfs == null || path == null) {

            }
            // 获取文件列表
            FileStatus[] files = hdfs.listStatus(path);

            // 展示文件信息
            for (int i = 0; i < files.length; i++) {
                try {
                    if (files[i].isDirectory()) {

                        iteratorShowFiles(hdfs, files[i].getPath());
                    } else if (files[i].isFile()) {
                        String a = files[i].getPath().toString().trim();

                        aList.add(a);

                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }

        return aList;
    }

}

1

路过

雷人

握手

鲜花

鸡蛋

刚表态过的朋友 (1 人)

评论 (0 个评论)

facelist doodle 涂鸦板

您需要登录后才可以评论 登录 | 立即注册

关闭

推荐上一条 /2 下一条