分享

hadoop使用PathFilter遇到的一些问题

S|C 发表于 2016-8-22 13:33:56 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 2 11760
本帖最后由 S|C 于 2016-8-23 09:33 编辑
问题导读:

1.hadoop在分析数据时怎么过滤文件?
2.hadoop过滤文件的输入路径配置需要注意什么
3.过滤文件输入路径支持正则表达式吗?




下面分享一下本人在开发MR程序使用PathFilter遇到的一些问题

业务场景

Hadoop的hdfs仓库中指定目录下有许多的文件,这些文件由数据采集端按小时生成并上传至hdfs中,文件列表如下图(一天之内的数据)
[mw_shl_code=shell,true]-rw-r--r--   3 hadoop supergroup     383638 2016-08-21 01:00 /hive/uac/dt=2016-08-21/channel=web/pv/web_pv.27.2016-08-21.0
-rw-r--r--   3 hadoop supergroup     282175 2016-08-21 02:00 /hive/uac/dt=2016-08-21/channel=web/pv/web_pv.27.2016-08-21.1
-rw-r--r--   3 hadoop supergroup     921844 2016-08-21 11:00 /hive/uac/dt=2016-08-21/channel=web/pv/web_pv.27.2016-08-21.10
-rw-r--r--   3 hadoop supergroup     787638 2016-08-21 12:00 /hive/uac/dt=2016-08-21/channel=web/pv/web_pv.27.2016-08-21.11
-rw-r--r--   3 hadoop supergroup     619239 2016-08-21 13:00 /hive/uac/dt=2016-08-21/channel=web/pv/web_pv.27.2016-08-21.12
-rw-r--r--   3 hadoop supergroup     776499 2016-08-21 14:00 /hive/uac/dt=2016-08-21/channel=web/pv/web_pv.27.2016-08-21.13
-rw-r--r--   3 hadoop supergroup     959702 2016-08-21 15:00 /hive/uac/dt=2016-08-21/channel=web/pv/web_pv.27.2016-08-21.14
-rw-r--r--   3 hadoop supergroup     932658 2016-08-21 16:00 /hive/uac/dt=2016-08-21/channel=web/pv/web_pv.27.2016-08-21.15
-rw-r--r--   3 hadoop supergroup     958502 2016-08-21 17:00 /hive/uac/dt=2016-08-21/channel=web/pv/web_pv.27.2016-08-21.16
-rw-r--r--   3 hadoop supergroup     828683 2016-08-21 18:00 /hive/uac/dt=2016-08-21/channel=web/pv/web_pv.27.2016-08-21.17
-rw-r--r--   3 hadoop supergroup    1084602 2016-08-21 19:00 /hive/uac/dt=2016-08-21/channel=web/pv/web_pv.27.2016-08-21.18
-rw-r--r--   3 hadoop supergroup    1090640 2016-08-21 20:00 /hive/uac/dt=2016-08-21/channel=web/pv/web_pv.27.2016-08-21.19
-rw-r--r--   3 hadoop supergroup     191149 2016-08-21 03:00 /hive/uac/dt=2016-08-21/channel=web/pv/web_pv.27.2016-08-21.2
-rw-r--r--   3 hadoop supergroup     712515 2016-08-21 21:00 /hive/uac/dt=2016-08-21/channel=web/pv/web_pv.27.2016-08-21.20
-rw-r--r--   3 hadoop supergroup     955075 2016-08-21 22:00 /hive/uac/dt=2016-08-21/channel=web/pv/web_pv.27.2016-08-21.21
-rw-r--r--   3 hadoop supergroup    1006990 2016-08-21 23:00 /hive/uac/dt=2016-08-21/channel=web/pv/web_pv.27.2016-08-21.22
-rw-r--r--   3 hadoop supergroup     581031 2016-08-22 00:00 /hive/uac/dt=2016-08-21/channel=web/pv/web_pv.27.2016-08-21.23
-rw-r--r--   3 hadoop supergroup     355898 2016-08-21 04:00 /hive/uac/dt=2016-08-21/channel=web/pv/web_pv.27.2016-08-21.3
-rw-r--r--   3 hadoop supergroup     402907 2016-08-21 05:00 /hive/uac/dt=2016-08-21/channel=web/pv/web_pv.27.2016-08-21.4
-rw-r--r--   3 hadoop supergroup     172758 2016-08-21 06:00 /hive/uac/dt=2016-08-21/channel=web/pv/web_pv.27.2016-08-21.5
-rw-r--r--   3 hadoop supergroup     300429 2016-08-21 07:00 /hive/uac/dt=2016-08-21/channel=web/pv/web_pv.27.2016-08-21.6
-rw-r--r--   3 hadoop supergroup     358782 2016-08-21 08:00 /hive/uac/dt=2016-08-21/channel=web/pv/web_pv.27.2016-08-21.7
-rw-r--r--   3 hadoop supergroup     323841 2016-08-21 09:00 /hive/uac/dt=2016-08-21/channel=web/pv/web_pv.27.2016-08-21.8
-rw-r--r--   3 hadoop supergroup    1066597 2016-08-21 10:00 /hive/uac/dt=2016-08-21/channel=web/pv/web_pv.27.2016-08-21.9
-rw-r--r--   3 hadoop supergroup     266572 2016-08-21 01:00 /hive/uac/dt=2016-08-21/channel=web/pv/web_pv.28.2016-08-21.0
-rw-r--r--   3 hadoop supergroup     141310 2016-08-21 02:00 /hive/uac/dt=2016-08-21/channel=web/pv/web_pv.28.2016-08-21.1
-rw-r--r--   3 hadoop supergroup     282398 2016-08-21 11:02 /hive/uac/dt=2016-08-21/channel=web/pv/web_pv.28.2016-08-21.10
-rw-r--r--   3 hadoop supergroup     319551 2016-08-21 12:00 /hive/uac/dt=2016-08-21/channel=web/pv/web_pv.28.2016-08-21.11
-rw-r--r--   3 hadoop supergroup     391468 2016-08-21 13:00 /hive/uac/dt=2016-08-21/channel=web/pv/web_pv.28.2016-08-21.12
-rw-r--r--   3 hadoop supergroup     330251 2016-08-21 14:00 /hive/uac/dt=2016-08-21/channel=web/pv/web_pv.28.2016-08-21.13
-rw-r--r--   3 hadoop supergroup     224677 2016-08-21 15:00 /hive/uac/dt=2016-08-21/channel=web/pv/web_pv.28.2016-08-21.14
-rw-r--r--   3 hadoop supergroup     355869 2016-08-21 16:02 /hive/uac/dt=2016-08-21/channel=web/pv/web_pv.28.2016-08-21.15
-rw-r--r--   3 hadoop supergroup     364699 2016-08-21 17:00 /hive/uac/dt=2016-08-21/channel=web/pv/web_pv.28.2016-08-21.16
-rw-r--r--   3 hadoop supergroup     437244 2016-08-21 18:00 /hive/uac/dt=2016-08-21/channel=web/pv/web_pv.28.2016-08-21.17
-rw-r--r--   3 hadoop supergroup     840551 2016-08-21 19:09 /hive/uac/dt=2016-08-21/channel=web/pv/web_pv.28.2016-08-21.18
-rw-r--r--   3 hadoop supergroup     264048 2016-08-21 20:00 /hive/uac/dt=2016-08-21/channel=web/pv/web_pv.28.2016-08-21.19
-rw-r--r--   3 hadoop supergroup     121042 2016-08-21 03:00 /hive/uac/dt=2016-08-21/channel=web/pv/web_pv.28.2016-08-21.2
-rw-r--r--   3 hadoop supergroup     613788 2016-08-21 21:00 /hive/uac/dt=2016-08-21/channel=web/pv/web_pv.28.2016-08-21.20
-rw-r--r--   3 hadoop supergroup     538134 2016-08-21 22:00 /hive/uac/dt=2016-08-21/channel=web/pv/web_pv.28.2016-08-21.21
-rw-r--r--   3 hadoop supergroup     355747 2016-08-21 23:00 /hive/uac/dt=2016-08-21/channel=web/pv/web_pv.28.2016-08-21.22
-rw-r--r--   3 hadoop supergroup     568559 2016-08-22 00:00 /hive/uac/dt=2016-08-21/channel=web/pv/web_pv.28.2016-08-21.23
-rw-r--r--   3 hadoop supergroup      91631 2016-08-21 04:00 /hive/uac/dt=2016-08-21/channel=web/pv/web_pv.28.2016-08-21.3
-rw-r--r--   3 hadoop supergroup      96407 2016-08-21 05:00 /hive/uac/dt=2016-08-21/channel=web/pv/web_pv.28.2016-08-21.4
-rw-r--r--   3 hadoop supergroup     154202 2016-08-21 06:00 /hive/uac/dt=2016-08-21/channel=web/pv/web_pv.28.2016-08-21.5
-rw-r--r--   3 hadoop supergroup     169538 2016-08-21 07:00 /hive/uac/dt=2016-08-21/channel=web/pv/web_pv.28.2016-08-21.6
-rw-r--r--   3 hadoop supergroup     441323 2016-08-21 08:00 /hive/uac/dt=2016-08-21/channel=web/pv/web_pv.28.2016-08-21.7
-rw-r--r--   3 hadoop supergroup     636955 2016-08-21 09:00 /hive/uac/dt=2016-08-21/channel=web/pv/web_pv.28.2016-08-21.8
-rw-r--r--   3 hadoop supergroup     294494 2016-08-21 10:02 /hive/uac/dt=2016-08-21/channel=web/pv/web_pv.28.2016-08-21.9[/mw_shl_code]

需求是每个整点时刻过2分的时候读取上一个小时的数据;
例如当前时间为18点02分,那么需要读取文件后缀名为17的数据文件,考虑到Hadoop中提供了org.apache.hadoop.fs.PathFilter对象,所以尝试使用;
结合正则表达式,来过滤我们想要的文件数据。

主要代码实现
[mw_shl_code=java,true]
static final String WEBPV_HOUR_FILTER_REGEX = "webpv.hour.filter.regex";                                                                                         /**
     * 过滤每小时的正则表达式
     */
    static final String WEBPV_HOUR_FILTER_REGEX_PATTERN_PRE = "web_pv\\.\\d{1,3}\\.[0-9]{4}-[0-9]{2}-[0-9]{2}\\.";
    /**
     * webpv小时文件的过滤filter
     * @author qiankun.li
     *
     */
    public static class WebPvPathHourFilter extends Configured implements PathFilter{
        public WebPvPathHourFilter() {}

        @Override
        public boolean accept(Path path) {
            int hour = getConf().getInt(WEBPV_HOUR_FILTER_REGEX, getPreHour());
            String pathValue = path.toString();
            return pathValue.matches(WEBPV_HOUR_FILTER_REGEX_PATTERN_PRE+hour);
        }

    }

public static int getPreHour(){
        Calendar now = Calendar.getInstance();
        now.set(Calendar.HOUR_OF_DAY, now.get(Calendar.HOUR_OF_DAY)-1);
        int hour = now.get(Calendar.HOUR_OF_DAY);
        return hour;
    }
[/mw_shl_code]
设置运行变量和filter
[mw_shl_code=java,true]
int hour = getPreHour();//默认是取上一小时
job.getConfiguration().setInt(WEBPV_HOUR_FILTER_REGEX, hour);
FileInputFormat.setInputPathFilter(job, WebPvPathHourFilter.class);[/mw_shl_code]

代码添加完毕,打jar包,linux脚本运行命令如下
[mw_shl_code=shell,true]
bin/hadoop jar ~/jar/hadoop_task.jar com.**.hadoop.mr.online.WebChannelPvNews -libjars /home/hadoop/jar/guava-r09.jar,/home/hadoop/jar/mysql-connector-java-5.1.34.jar /hive/uac/dt=2016-08-21/channel=web/pv 2016-08-21[/mw_shl_code]

运行结果如下:
[mw_shl_code=java,true]
Exception in thread "main" org.apache.hadoop.mapreduce.lib.input.InvalidInputException: Input path does not exist: hdfs://Hadoop22.howbuy.local:9000/hive/uac/dt=2016-08-21/channel=web/pv
    at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.listStatus(FileInputFormat.java:243)
    at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.getSplits(FileInputFormat.java:269)
    at org.apache.hadoop.mapreduce.JobSubmitter.writeNewSplits(JobSubmitter.java:452)
    at org.apache.hadoop.mapreduce.JobSubmitter.writeSplits(JobSubmitter.java:469)
    at org.apache.hadoop.mapreduce.JobSubmitter.submitJobInternal(JobSubmitter.java:366)
    at org.apache.hadoop.mapreduce.Job$11.run(Job.java:1218)
    at org.apache.hadoop.mapreduce.Job$11.run(Job.java:1215)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:415)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1408)
    at org.apache.hadoop.mapreduce.Job.submit(Job.java:1215)
    at org.apache.hadoop.mapreduce.Job.waitForCompletion(Job.java:1236)
    at com.howbuy.hadoop.mr.online.WebChannelPvNews.run(WebChannelPvNews.java:286)
    at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)
    at com.howbuy.hadoop.mr.online.WebChannelPvNews.main(WebChannelPvNews.java:293)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.hadoop.util.RunJar.main(RunJar.java:208)[/mw_shl_code]

代码抛出异常,为什么呢?正则表达式不对导致的?首先我尝试在代码中打印数据,代码如下
[mw_shl_code=java,true]
@Override
public boolean accept(Path path) {
            int hour = getConf().getInt(WEBPV_HOUR_FILTER_REGEX, getPreHour());
            String pathValue = path.toString();
            System.out.println(pathValue);         
            return pathValue.matches(WEBPV_HOUR_FILTER_REGEX_PATTERN_PRE+hour);

        }[/mw_shl_code]

再次运行,和上述结果一;查看异常信息的源代码位置
[mw_shl_code=java,true]
/** List input directories.
   * Subclasses may override to, e.g., select only files matching a regular
   * expression.
   *
   * @param job the job to list input paths for
   * @return array of FileStatus objects
   * @throws IOException if zero items.
   */
  protected List<FileStatus> listStatus(JobContext job
                                        ) throws IOException {
    List<FileStatus> result = new ArrayList<FileStatus>();
    Path[] dirs = getInputPaths(job);
    if (dirs.length == 0) {
      throw new IOException("No input paths specified in job");
    }

    // get tokens for all the required FileSystems..
    TokenCache.obtainTokensForNamenodes(job.getCredentials(), dirs,
                                        job.getConfiguration());

    List<IOException> errors = new ArrayList<IOException>();

    // creates a MultiPathFilter with the hiddenFileFilter and the
    // user provided one (if any).
    List<PathFilter> filters = new ArrayList<PathFilter>();
    filters.add(hiddenFileFilter);
    PathFilter jobFilter = getInputPathFilter(job);
    if (jobFilter != null) {
      filters.add(jobFilter);
    }
    PathFilter inputFilter = new MultiPathFilter(filters);

    for (int i=0; i < dirs.length; ++i) {
      Path p = dirs;
      FileSystem fs = p.getFileSystem(job.getConfiguration());
      FileStatus[] matches = fs.globStatus(p, inputFilter);
      if (matches == null) {
        errors.add(new IOException("Input path does not exist: " + p));//异常
      } else if (matches.length == 0) {
        errors.add(new IOException("Input Pattern " + p + " matches 0 files"));//异常
      } else {
        for (FileStatus globStat: matches) {
          if (globStat.isDirectory()) {
            for(FileStatus stat: fs.listStatus(globStat.getPath(),
                inputFilter)) {
              result.add(stat);
            }         
          } else {
            result.add(globStat);
          }
        }
      }
    }

    if (!errors.isEmpty()) {
      throw new InvalidInputException(errors);
    }
    LOG.info("Total input paths to process : " + result.size());
    return result;
  }
[/mw_shl_code]
查看fs.globStatus(p,inputFilter)方法里面有调用如下代码
[mw_shl_code=java,true]
@SuppressWarnings("unchecked")
  private List<FileStatus> globStatusInternal(Path pathPattern,
      PathFilter filter) throws IOException {
    boolean patternHasGlob = false;       // pathPattern has any globs
    List<FileStatus> matches = new ArrayList<FileStatus>();

    // determine starting point
    int level = 0;
    String baseDir = Path.CUR_DIR;
    if (pathPattern.isAbsolute()) {
      level = 1; // need to skip empty item at beginning of split list
      baseDir = Path.SEPARATOR;
    }

    // parse components and determine if it's a glob
    String[] components = null;
    GlobFilter[] filters = null;
    String filename = pathPattern.toUri().getPath();
    if (!filename.isEmpty() && !Path.SEPARATOR.equals(filename)) {
      components = filename.split(Path.SEPARATOR);
      filters = new GlobFilter[components.length];
      for (int i=level; i < components.length; i++) {
        filters = new GlobFilter(components);
        //这里是判断是否有通配符,有就赋值给patternHasGlob变量
        patternHasGlob |= filters.hasPattern();

      }
      if (!patternHasGlob) {
        baseDir = unquotePathComponent(filename);
        components = null; // short through to filter check
      }
    }

    // seed the parent directory path, return if it doesn't exist
    try {
      matches.add(getFileStatus(new Path(baseDir)));
    } catch (FileNotFoundException e) {
      return patternHasGlob ? matches : null;
    }

    // skip if there are no components other than the basedir
    if (components != null) {
      // iterate through each path component
      for (int i=level; (i < components.length) && !matches.isEmpty(); i++) {
        List<FileStatus> children = new ArrayList<FileStatus>();
        for (FileStatus match : matches) {
          // don't look for children in a file matched by a glob
          if (!match.isDirectory()) {
            continue;
          }
          try {
            if (filters.hasPattern()) {
              // get all children matching the filter
              FileStatus[] statuses = listStatus(match.getPath(), filters);
              children.addAll(Arrays.asList(statuses));
            } else {
              // the component does not have a pattern
              String component = unquotePathComponent(components);
              Path child = new Path(match.getPath(), component);
              children.add(getFileStatus(child));
            }
          } catch (FileNotFoundException e) {
            // don't care
          }
        }
        matches = children;
      }
    }
    // remove anything that didn't match the filter
    if (!matches.isEmpty()) {
      Iterator<FileStatus> iter = matches.iterator();
      while (iter.hasNext()) {
        if (!filter.accept(iter.next().getPath())) {
          iter.remove();
        }
      }
    }
    // no final paths, if there were any globs return empty list
    if (matches.isEmpty()) {
        //没有通配符返回null,也正是抛出异常的原因
        return patternHasGlob ? matches : null;
    }
    Collections.sort(matches);
    return matches;
  }[/mw_shl_code]

发现问题1

这里的路径后面需要跟一个通配符,类似正则表达式,修改执行脚本如下
[mw_shl_code=shell,true]
bin/hadoop jar ~/jar/hadoop_task.jar com.**.hadoop.mr.online.WebChannelPvNews -libjars /home/hadoop/jar/guava-r09.jar,/home/hadoop/jar/mysql-connector-java-5.1.34.jar /hive/uac/dt=2016-08-21/channel=web/pv/* 2016-08-21[/mw_shl_code]

此时打印结果如下
[mw_shl_code=java,true]
hdfs://Hadoop22.howbuy.local:9000/hive/uac/dt=2016-08-21/channel=web/pv/web_pv.27.2016-08-21.0
hdfs://Hadoop22.howbuy.local:9000/hive/uac/dt=2016-08-21/channel=web/pv/web_pv.27.2016-08-21.1
hdfs://Hadoop22.howbuy.local:9000/hive/uac/dt=2016-08-21/channel=web/pv/web_pv.27.2016-08-21.10
hdfs://Hadoop22.howbuy.local:9000/hive/uac/dt=2016-08-21/channel=web/pv/web_pv.27.2016-08-21.11
hdfs://Hadoop22.howbuy.local:9000/hive/uac/dt=2016-08-21/channel=web/pv/web_pv.27.2016-08-21.12
hdfs://Hadoop22.howbuy.local:9000/hive/uac/dt=2016-08-21/channel=web/pv/web_pv.27.2016-08-21.13
hdfs://Hadoop22.howbuy.local:9000/hive/uac/dt=2016-08-21/channel=web/pv/web_pv.27.2016-08-21.14
hdfs://Hadoop22.howbuy.local:9000/hive/uac/dt=2016-08-21/channel=web/pv/web_pv.27.2016-08-21.15
hdfs://Hadoop22.howbuy.local:9000/hive/uac/dt=2016-08-21/channel=web/pv/web_pv.27.2016-08-21.16
hdfs://Hadoop22.howbuy.local:9000/hive/uac/dt=2016-08-21/channel=web/pv/web_pv.27.2016-08-21.17
hdfs://Hadoop22.howbuy.local:9000/hive/uac/dt=2016-08-21/channel=web/pv/web_pv.27.2016-08-21.18
hdfs://Hadoop22.howbuy.local:9000/hive/uac/dt=2016-08-21/channel=web/pv/web_pv.27.2016-08-21.19
hdfs://Hadoop22.howbuy.local:9000/hive/uac/dt=2016-08-21/channel=web/pv/web_pv.27.2016-08-21.2
hdfs://Hadoop22.howbuy.local:9000/hive/uac/dt=2016-08-21/channel=web/pv/web_pv.27.2016-08-21.20
hdfs://Hadoop22.howbuy.local:9000/hive/uac/dt=2016-08-21/channel=web/pv/web_pv.27.2016-08-21.21
hdfs://Hadoop22.howbuy.local:9000/hive/uac/dt=2016-08-21/channel=web/pv/web_pv.27.2016-08-21.22
hdfs://Hadoop22.howbuy.local:9000/hive/uac/dt=2016-08-21/channel=web/pv/web_pv.27.2016-08-21.23
hdfs://Hadoop22.howbuy.local:9000/hive/uac/dt=2016-08-21/channel=web/pv/web_pv.27.2016-08-21.3
hdfs://Hadoop22.howbuy.local:9000/hive/uac/dt=2016-08-21/channel=web/pv/web_pv.27.2016-08-21.4
hdfs://Hadoop22.howbuy.local:9000/hive/uac/dt=2016-08-21/channel=web/pv/web_pv.27.2016-08-21.5
hdfs://Hadoop22.howbuy.local:9000/hive/uac/dt=2016-08-21/channel=web/pv/web_pv.27.2016-08-21.6
hdfs://Hadoop22.howbuy.local:9000/hive/uac/dt=2016-08-21/channel=web/pv/web_pv.27.2016-08-21.7
hdfs://Hadoop22.howbuy.local:9000/hive/uac/dt=2016-08-21/channel=web/pv/web_pv.27.2016-08-21.8
hdfs://Hadoop22.howbuy.local:9000/hive/uac/dt=2016-08-21/channel=web/pv/web_pv.27.2016-08-21.9
hdfs://Hadoop22.howbuy.local:9000/hive/uac/dt=2016-08-21/channel=web/pv/web_pv.28.2016-08-21.0
hdfs://Hadoop22.howbuy.local:9000/hive/uac/dt=2016-08-21/channel=web/pv/web_pv.28.2016-08-21.1
hdfs://Hadoop22.howbuy.local:9000/hive/uac/dt=2016-08-21/channel=web/pv/web_pv.28.2016-08-21.10
hdfs://Hadoop22.howbuy.local:9000/hive/uac/dt=2016-08-21/channel=web/pv/web_pv.28.2016-08-21.11
hdfs://Hadoop22.howbuy.local:9000/hive/uac/dt=2016-08-21/channel=web/pv/web_pv.28.2016-08-21.12
hdfs://Hadoop22.howbuy.local:9000/hive/uac/dt=2016-08-21/channel=web/pv/web_pv.28.2016-08-21.13
hdfs://Hadoop22.howbuy.local:9000/hive/uac/dt=2016-08-21/channel=web/pv/web_pv.28.2016-08-21.14
hdfs://Hadoop22.howbuy.local:9000/hive/uac/dt=2016-08-21/channel=web/pv/web_pv.28.2016-08-21.15
hdfs://Hadoop22.howbuy.local:9000/hive/uac/dt=2016-08-21/channel=web/pv/web_pv.28.2016-08-21.16
hdfs://Hadoop22.howbuy.local:9000/hive/uac/dt=2016-08-21/channel=web/pv/web_pv.28.2016-08-21.17
hdfs://Hadoop22.howbuy.local:9000/hive/uac/dt=2016-08-21/channel=web/pv/web_pv.28.2016-08-21.18
hdfs://Hadoop22.howbuy.local:9000/hive/uac/dt=2016-08-21/channel=web/pv/web_pv.28.2016-08-21.19
hdfs://Hadoop22.howbuy.local:9000/hive/uac/dt=2016-08-21/channel=web/pv/web_pv.28.2016-08-21.2
hdfs://Hadoop22.howbuy.local:9000/hive/uac/dt=2016-08-21/channel=web/pv/web_pv.28.2016-08-21.20
hdfs://Hadoop22.howbuy.local:9000/hive/uac/dt=2016-08-21/channel=web/pv/web_pv.28.2016-08-21.21
hdfs://Hadoop22.howbuy.local:9000/hive/uac/dt=2016-08-21/channel=web/pv/web_pv.28.2016-08-21.22
hdfs://Hadoop22.howbuy.local:9000/hive/uac/dt=2016-08-21/channel=web/pv/web_pv.28.2016-08-21.23
hdfs://Hadoop22.howbuy.local:9000/hive/uac/dt=2016-08-21/channel=web/pv/web_pv.28.2016-08-21.3
hdfs://Hadoop22.howbuy.local:9000/hive/uac/dt=2016-08-21/channel=web/pv/web_pv.28.2016-08-21.4
hdfs://Hadoop22.howbuy.local:9000/hive/uac/dt=2016-08-21/channel=web/pv/web_pv.28.2016-08-21.5
hdfs://Hadoop22.howbuy.local:9000/hive/uac/dt=2016-08-21/channel=web/pv/web_pv.28.2016-08-21.6
hdfs://Hadoop22.howbuy.local:9000/hive/uac/dt=2016-08-21/channel=web/pv/web_pv.28.2016-08-21.7
hdfs://Hadoop22.howbuy.local:9000/hive/uac/dt=2016-08-21/channel=web/pv/web_pv.28.2016-08-21.8
hdfs://Hadoop22.howbuy.local:9000/hive/uac/dt=2016-08-21/channel=web/pv/web_pv.28.2016-08-21.9
16/08/22 10:59:32 INFO mapreduce.JobSubmitter: Cleaning up the staging area /data/hadoop1_data/hadoop/staging/hadoop/.staging/job_1471259823017_0357
16/08/22 10:59:32 ERROR security.UserGroupInformation: PriviledgedActionException as:hadoop (auth:SIMPLE) cause:org.apache.hadoop.mapreduce.lib.input.InvalidInputException: Input Pattern hdfs://Hadoop22.howbuy.local:9000/hive/uac/dt=2016-08-21/channel=web/pv/* matches 0 files
Exception in thread "main" org.apache.hadoop.mapreduce.lib.input.InvalidInputException: Input Pattern hdfs://Hadoop22.howbuy.local:9000/hive/uac/dt=2016-08-21/channel=web/pv/* matches 0 files
    at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.listStatus(FileInputFormat.java:243)
    at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.getSplits(FileInputFormat.java:269)
    at org.apache.hadoop.mapreduce.JobSubmitter.writeNewSplits(JobSubmitter.java:452)
    at org.apache.hadoop.mapreduce.JobSubmitter.writeSplits(JobSubmitter.java:469)
    at org.apache.hadoop.mapreduce.JobSubmitter.submitJobInternal(JobSubmitter.java:366)
    at org.apache.hadoop.mapreduce.Job$11.run(Job.java:1218)
    at org.apache.hadoop.mapreduce.Job$11.run(Job.java:1215)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:415)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1408)
    at org.apache.hadoop.mapreduce.Job.submit(Job.java:1215)
    at org.apache.hadoop.mapreduce.Job.waitForCompletion(Job.java:1236)
    at com.howbuy.hadoop.mr.online.WebChannelPvNews.run(WebChannelPvNews.java:286)
    at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)
    at com.howbuy.hadoop.mr.online.WebChannelPvNews.main(WebChannelPvNews.java:293)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.hadoop.util.RunJar.main(RunJar.java:208)
[/mw_shl_code]
仍然抛出异常,但能说明修改生效,结果是matches 0 files,不过我们可以清晰的看到打印出来的路径信息,是完整的路径,完整的路径啊!也不怪正则表达式没法用

发现问题2

pathfilter中的path的值是携带了${fs.defaultFS}的完整路径
想到了解决办法2个
1:在正则前面拼接${fs.defaultFS}的值;
2:直接匹配文件的结尾部分
由于文件名规则基本不会太大改变,所以我选择了方案二来解决,简单(有偷懒嫌疑)。
修改代码
[mw_shl_code=java,true]
@Override
public boolean accept(Path path) {
    int hour = getConf().getInt(WEBPV_HOUR_FILTER_REGEX, getPreHour());
    String dt = getConf().get("dt");
    String regex = dt+"."+hour;
    String pathValue = path.toString();
    System.out.println(pathValue);
    return pathValue.endsWith(regex);//匹配结尾
}
[/mw_shl_code]
重新打包运行,脚本如下

bin/hadoop jar ~/jar/hadoop_task.jar com.**.hadoop.mr.online.WebChannelPvNews -libjars /home/hadoop/jar/guava-r09.jar,/home/hadoop/jar/mysql-connector-java-5.1.34.jar /hive/uac/dt=2016-08-21/channel=web/pv/* 2016-08-21

运行结果 看到了胜利的曙光
[mw_shl_code=java,true]
16/08/22 11:24:25 INFO mapreduce.Job: Running job: job_1471259823017_0362
16/08/22 11:24:31 INFO mapreduce.Job: Job job_1471259823017_0362 running in uber mode : false
16/08/22 11:24:31 INFO mapreduce.Job:  map 0% reduce 0%
16/08/22 11:24:36 INFO mapreduce.Job:  map 100% reduce 0%
16/08/22 11:24:47 INFO mapreduce.Job:  map 100% reduce 100%
16/08/22 11:24:47 INFO mapreduce.Job: Job job_1471259823017_0362 completed successfully
16/08/22 11:24:47 INFO mapreduce.Job: Counters: 47[/mw_shl_code]

问题解决,以上就是我在使用pathfilter过程中遇到的问题,分享给大家,希望有帮助。欢迎拍砖!
原文链接http://blog.csdn.net/charsli/article/details/52274460


已有(2)人评论

跳转到指定楼层
lele 发表于 2016-8-22 14:29:27
写的不错,谢谢楼主
回复

使用道具 举报

lingyufeng 发表于 2016-8-23 09:12:47
谢谢分享,非常详细
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条