本帖最后由 S|C 于 2016-8-23 09:33 编辑
问题导读:
1.hadoop在分析数据时怎么过滤文件? 2.hadoop过滤文件的输入路径配置需要注意什么? 3.过滤文件输入路径支持正则表达式吗?
下面分享一下本人在开发MR程序使用PathFilter遇到的一些问题
业务场景
Hadoop的hdfs仓库中指定目录下有许多的文件,这些文件由数据采集端按小时生成并上传至hdfs中,文件列表如下图(一天之内的数据)
[mw_shl_code=shell,true]-rw-r--r-- 3 hadoop supergroup 383638 2016-08-21 01:00 /hive/uac/dt=2016-08-21/channel=web/pv/web_pv.27.2016-08-21.0
-rw-r--r-- 3 hadoop supergroup 282175 2016-08-21 02:00 /hive/uac/dt=2016-08-21/channel=web/pv/web_pv.27.2016-08-21.1
-rw-r--r-- 3 hadoop supergroup 921844 2016-08-21 11:00 /hive/uac/dt=2016-08-21/channel=web/pv/web_pv.27.2016-08-21.10
-rw-r--r-- 3 hadoop supergroup 787638 2016-08-21 12:00 /hive/uac/dt=2016-08-21/channel=web/pv/web_pv.27.2016-08-21.11
-rw-r--r-- 3 hadoop supergroup 619239 2016-08-21 13:00 /hive/uac/dt=2016-08-21/channel=web/pv/web_pv.27.2016-08-21.12
-rw-r--r-- 3 hadoop supergroup 776499 2016-08-21 14:00 /hive/uac/dt=2016-08-21/channel=web/pv/web_pv.27.2016-08-21.13
-rw-r--r-- 3 hadoop supergroup 959702 2016-08-21 15:00 /hive/uac/dt=2016-08-21/channel=web/pv/web_pv.27.2016-08-21.14
-rw-r--r-- 3 hadoop supergroup 932658 2016-08-21 16:00 /hive/uac/dt=2016-08-21/channel=web/pv/web_pv.27.2016-08-21.15
-rw-r--r-- 3 hadoop supergroup 958502 2016-08-21 17:00 /hive/uac/dt=2016-08-21/channel=web/pv/web_pv.27.2016-08-21.16
-rw-r--r-- 3 hadoop supergroup 828683 2016-08-21 18:00 /hive/uac/dt=2016-08-21/channel=web/pv/web_pv.27.2016-08-21.17
-rw-r--r-- 3 hadoop supergroup 1084602 2016-08-21 19:00 /hive/uac/dt=2016-08-21/channel=web/pv/web_pv.27.2016-08-21.18
-rw-r--r-- 3 hadoop supergroup 1090640 2016-08-21 20:00 /hive/uac/dt=2016-08-21/channel=web/pv/web_pv.27.2016-08-21.19
-rw-r--r-- 3 hadoop supergroup 191149 2016-08-21 03:00 /hive/uac/dt=2016-08-21/channel=web/pv/web_pv.27.2016-08-21.2
-rw-r--r-- 3 hadoop supergroup 712515 2016-08-21 21:00 /hive/uac/dt=2016-08-21/channel=web/pv/web_pv.27.2016-08-21.20
-rw-r--r-- 3 hadoop supergroup 955075 2016-08-21 22:00 /hive/uac/dt=2016-08-21/channel=web/pv/web_pv.27.2016-08-21.21
-rw-r--r-- 3 hadoop supergroup 1006990 2016-08-21 23:00 /hive/uac/dt=2016-08-21/channel=web/pv/web_pv.27.2016-08-21.22
-rw-r--r-- 3 hadoop supergroup 581031 2016-08-22 00:00 /hive/uac/dt=2016-08-21/channel=web/pv/web_pv.27.2016-08-21.23
-rw-r--r-- 3 hadoop supergroup 355898 2016-08-21 04:00 /hive/uac/dt=2016-08-21/channel=web/pv/web_pv.27.2016-08-21.3
-rw-r--r-- 3 hadoop supergroup 402907 2016-08-21 05:00 /hive/uac/dt=2016-08-21/channel=web/pv/web_pv.27.2016-08-21.4
-rw-r--r-- 3 hadoop supergroup 172758 2016-08-21 06:00 /hive/uac/dt=2016-08-21/channel=web/pv/web_pv.27.2016-08-21.5
-rw-r--r-- 3 hadoop supergroup 300429 2016-08-21 07:00 /hive/uac/dt=2016-08-21/channel=web/pv/web_pv.27.2016-08-21.6
-rw-r--r-- 3 hadoop supergroup 358782 2016-08-21 08:00 /hive/uac/dt=2016-08-21/channel=web/pv/web_pv.27.2016-08-21.7
-rw-r--r-- 3 hadoop supergroup 323841 2016-08-21 09:00 /hive/uac/dt=2016-08-21/channel=web/pv/web_pv.27.2016-08-21.8
-rw-r--r-- 3 hadoop supergroup 1066597 2016-08-21 10:00 /hive/uac/dt=2016-08-21/channel=web/pv/web_pv.27.2016-08-21.9
-rw-r--r-- 3 hadoop supergroup 266572 2016-08-21 01:00 /hive/uac/dt=2016-08-21/channel=web/pv/web_pv.28.2016-08-21.0
-rw-r--r-- 3 hadoop supergroup 141310 2016-08-21 02:00 /hive/uac/dt=2016-08-21/channel=web/pv/web_pv.28.2016-08-21.1
-rw-r--r-- 3 hadoop supergroup 282398 2016-08-21 11:02 /hive/uac/dt=2016-08-21/channel=web/pv/web_pv.28.2016-08-21.10
-rw-r--r-- 3 hadoop supergroup 319551 2016-08-21 12:00 /hive/uac/dt=2016-08-21/channel=web/pv/web_pv.28.2016-08-21.11
-rw-r--r-- 3 hadoop supergroup 391468 2016-08-21 13:00 /hive/uac/dt=2016-08-21/channel=web/pv/web_pv.28.2016-08-21.12
-rw-r--r-- 3 hadoop supergroup 330251 2016-08-21 14:00 /hive/uac/dt=2016-08-21/channel=web/pv/web_pv.28.2016-08-21.13
-rw-r--r-- 3 hadoop supergroup 224677 2016-08-21 15:00 /hive/uac/dt=2016-08-21/channel=web/pv/web_pv.28.2016-08-21.14
-rw-r--r-- 3 hadoop supergroup 355869 2016-08-21 16:02 /hive/uac/dt=2016-08-21/channel=web/pv/web_pv.28.2016-08-21.15
-rw-r--r-- 3 hadoop supergroup 364699 2016-08-21 17:00 /hive/uac/dt=2016-08-21/channel=web/pv/web_pv.28.2016-08-21.16
-rw-r--r-- 3 hadoop supergroup 437244 2016-08-21 18:00 /hive/uac/dt=2016-08-21/channel=web/pv/web_pv.28.2016-08-21.17
-rw-r--r-- 3 hadoop supergroup 840551 2016-08-21 19:09 /hive/uac/dt=2016-08-21/channel=web/pv/web_pv.28.2016-08-21.18
-rw-r--r-- 3 hadoop supergroup 264048 2016-08-21 20:00 /hive/uac/dt=2016-08-21/channel=web/pv/web_pv.28.2016-08-21.19
-rw-r--r-- 3 hadoop supergroup 121042 2016-08-21 03:00 /hive/uac/dt=2016-08-21/channel=web/pv/web_pv.28.2016-08-21.2
-rw-r--r-- 3 hadoop supergroup 613788 2016-08-21 21:00 /hive/uac/dt=2016-08-21/channel=web/pv/web_pv.28.2016-08-21.20
-rw-r--r-- 3 hadoop supergroup 538134 2016-08-21 22:00 /hive/uac/dt=2016-08-21/channel=web/pv/web_pv.28.2016-08-21.21
-rw-r--r-- 3 hadoop supergroup 355747 2016-08-21 23:00 /hive/uac/dt=2016-08-21/channel=web/pv/web_pv.28.2016-08-21.22
-rw-r--r-- 3 hadoop supergroup 568559 2016-08-22 00:00 /hive/uac/dt=2016-08-21/channel=web/pv/web_pv.28.2016-08-21.23
-rw-r--r-- 3 hadoop supergroup 91631 2016-08-21 04:00 /hive/uac/dt=2016-08-21/channel=web/pv/web_pv.28.2016-08-21.3
-rw-r--r-- 3 hadoop supergroup 96407 2016-08-21 05:00 /hive/uac/dt=2016-08-21/channel=web/pv/web_pv.28.2016-08-21.4
-rw-r--r-- 3 hadoop supergroup 154202 2016-08-21 06:00 /hive/uac/dt=2016-08-21/channel=web/pv/web_pv.28.2016-08-21.5
-rw-r--r-- 3 hadoop supergroup 169538 2016-08-21 07:00 /hive/uac/dt=2016-08-21/channel=web/pv/web_pv.28.2016-08-21.6
-rw-r--r-- 3 hadoop supergroup 441323 2016-08-21 08:00 /hive/uac/dt=2016-08-21/channel=web/pv/web_pv.28.2016-08-21.7
-rw-r--r-- 3 hadoop supergroup 636955 2016-08-21 09:00 /hive/uac/dt=2016-08-21/channel=web/pv/web_pv.28.2016-08-21.8
-rw-r--r-- 3 hadoop supergroup 294494 2016-08-21 10:02 /hive/uac/dt=2016-08-21/channel=web/pv/web_pv.28.2016-08-21.9[/mw_shl_code]
需求是每个整点时刻过2分的时候读取上一个小时的数据;
例如当前时间为18点02分,那么需要读取文件后缀名为17的数据文件,考虑到Hadoop中提供了org.apache.hadoop.fs.PathFilter对象,所以尝试使用;
结合正则表达式,来过滤我们想要的文件数据。
主要代码实现
[mw_shl_code=java,true]
static final String WEBPV_HOUR_FILTER_REGEX = "webpv.hour.filter.regex"; /**
* 过滤每小时的正则表达式
*/
static final String WEBPV_HOUR_FILTER_REGEX_PATTERN_PRE = "web_pv\\.\\d{1,3}\\.[0-9]{4}-[0-9]{2}-[0-9]{2}\\.";
/**
* webpv小时文件的过滤filter
* @author qiankun.li
*
*/
public static class WebPvPathHourFilter extends Configured implements PathFilter{
public WebPvPathHourFilter() {}
@Override
public boolean accept(Path path) {
int hour = getConf().getInt(WEBPV_HOUR_FILTER_REGEX, getPreHour());
String pathValue = path.toString();
return pathValue.matches(WEBPV_HOUR_FILTER_REGEX_PATTERN_PRE+hour);
}
}
public static int getPreHour(){
Calendar now = Calendar.getInstance();
now.set(Calendar.HOUR_OF_DAY, now.get(Calendar.HOUR_OF_DAY)-1);
int hour = now.get(Calendar.HOUR_OF_DAY);
return hour;
}
[/mw_shl_code]
设置运行变量和filter
[mw_shl_code=java,true]
int hour = getPreHour();//默认是取上一小时
job.getConfiguration().setInt(WEBPV_HOUR_FILTER_REGEX, hour);
FileInputFormat.setInputPathFilter(job, WebPvPathHourFilter.class);[/mw_shl_code]
代码添加完毕,打jar包,linux脚本运行命令如下
[mw_shl_code=shell,true]
bin/hadoop jar ~/jar/hadoop_task.jar com.**.hadoop.mr.online.WebChannelPvNews -libjars /home/hadoop/jar/guava-r09.jar,/home/hadoop/jar/mysql-connector-java-5.1.34.jar /hive/uac/dt=2016-08-21/channel=web/pv 2016-08-21[/mw_shl_code]
运行结果如下:
[mw_shl_code=java,true]
Exception in thread "main" org.apache.hadoop.mapreduce.lib.input.InvalidInputException: Input path does not exist: hdfs://Hadoop22.howbuy.local:9000/hive/uac/dt=2016-08-21/channel=web/pv
at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.listStatus(FileInputFormat.java:243)
at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.getSplits(FileInputFormat.java:269)
at org.apache.hadoop.mapreduce.JobSubmitter.writeNewSplits(JobSubmitter.java:452)
at org.apache.hadoop.mapreduce.JobSubmitter.writeSplits(JobSubmitter.java:469)
at org.apache.hadoop.mapreduce.JobSubmitter.submitJobInternal(JobSubmitter.java:366)
at org.apache.hadoop.mapreduce.Job$11.run(Job.java:1218)
at org.apache.hadoop.mapreduce.Job$11.run(Job.java:1215)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1408)
at org.apache.hadoop.mapreduce.Job.submit(Job.java:1215)
at org.apache.hadoop.mapreduce.Job.waitForCompletion(Job.java:1236)
at com.howbuy.hadoop.mr.online.WebChannelPvNews.run(WebChannelPvNews.java:286)
at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)
at com.howbuy.hadoop.mr.online.WebChannelPvNews.main(WebChannelPvNews.java:293)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.hadoop.util.RunJar.main(RunJar.java:208)[/mw_shl_code]
代码抛出异常,为什么呢?正则表达式不对导致的?首先我尝试在代码中打印数据,代码如下
[mw_shl_code=java,true]
@Override
public boolean accept(Path path) {
int hour = getConf().getInt(WEBPV_HOUR_FILTER_REGEX, getPreHour());
String pathValue = path.toString();
System.out.println(pathValue);
return pathValue.matches(WEBPV_HOUR_FILTER_REGEX_PATTERN_PRE+hour);
}[/mw_shl_code]
再次运行,和上述结果一;查看异常信息的源代码位置
[mw_shl_code=java,true]
/** List input directories.
* Subclasses may override to, e.g., select only files matching a regular
* expression.
*
* @param job the job to list input paths for
* @return array of FileStatus objects
* @throws IOException if zero items.
*/
protected List<FileStatus> listStatus(JobContext job
) throws IOException {
List<FileStatus> result = new ArrayList<FileStatus>();
Path[] dirs = getInputPaths(job);
if (dirs.length == 0) {
throw new IOException("No input paths specified in job");
}
// get tokens for all the required FileSystems..
TokenCache.obtainTokensForNamenodes(job.getCredentials(), dirs,
job.getConfiguration());
List<IOException> errors = new ArrayList<IOException>();
// creates a MultiPathFilter with the hiddenFileFilter and the
// user provided one (if any).
List<PathFilter> filters = new ArrayList<PathFilter>();
filters.add(hiddenFileFilter);
PathFilter jobFilter = getInputPathFilter(job);
if (jobFilter != null) {
filters.add(jobFilter);
}
PathFilter inputFilter = new MultiPathFilter(filters);
for (int i=0; i < dirs.length; ++i) {
Path p = dirs;
FileSystem fs = p.getFileSystem(job.getConfiguration());
FileStatus[] matches = fs.globStatus(p, inputFilter);
if (matches == null) {
errors.add(new IOException("Input path does not exist: " + p));//异常
} else if (matches.length == 0) {
errors.add(new IOException("Input Pattern " + p + " matches 0 files"));//异常
} else {
for (FileStatus globStat: matches) {
if (globStat.isDirectory()) {
for(FileStatus stat: fs.listStatus(globStat.getPath(),
inputFilter)) {
result.add(stat);
}
} else {
result.add(globStat);
}
}
}
}
if (!errors.isEmpty()) {
throw new InvalidInputException(errors);
}
LOG.info("Total input paths to process : " + result.size());
return result;
}
[/mw_shl_code]
查看fs.globStatus(p,inputFilter)方法里面有调用如下代码
[mw_shl_code=java,true]
@SuppressWarnings("unchecked")
private List<FileStatus> globStatusInternal(Path pathPattern,
PathFilter filter) throws IOException {
boolean patternHasGlob = false; // pathPattern has any globs
List<FileStatus> matches = new ArrayList<FileStatus>();
// determine starting point
int level = 0;
String baseDir = Path.CUR_DIR;
if (pathPattern.isAbsolute()) {
level = 1; // need to skip empty item at beginning of split list
baseDir = Path.SEPARATOR;
}
// parse components and determine if it's a glob
String[] components = null;
GlobFilter[] filters = null;
String filename = pathPattern.toUri().getPath();
if (!filename.isEmpty() && !Path.SEPARATOR.equals(filename)) {
components = filename.split(Path.SEPARATOR);
filters = new GlobFilter[components.length];
for (int i=level; i < components.length; i++) {
filters = new GlobFilter(components);
//这里是判断是否有通配符,有就赋值给patternHasGlob变量
patternHasGlob |= filters.hasPattern();
}
if (!patternHasGlob) {
baseDir = unquotePathComponent(filename);
components = null; // short through to filter check
}
}
// seed the parent directory path, return if it doesn't exist
try {
matches.add(getFileStatus(new Path(baseDir)));
} catch (FileNotFoundException e) {
return patternHasGlob ? matches : null;
}
// skip if there are no components other than the basedir
if (components != null) {
// iterate through each path component
for (int i=level; (i < components.length) && !matches.isEmpty(); i++) {
List<FileStatus> children = new ArrayList<FileStatus>();
for (FileStatus match : matches) {
// don't look for children in a file matched by a glob
if (!match.isDirectory()) {
continue;
}
try {
if (filters.hasPattern()) {
// get all children matching the filter
FileStatus[] statuses = listStatus(match.getPath(), filters);
children.addAll(Arrays.asList(statuses));
} else {
// the component does not have a pattern
String component = unquotePathComponent(components);
Path child = new Path(match.getPath(), component);
children.add(getFileStatus(child));
}
} catch (FileNotFoundException e) {
// don't care
}
}
matches = children;
}
}
// remove anything that didn't match the filter
if (!matches.isEmpty()) {
Iterator<FileStatus> iter = matches.iterator();
while (iter.hasNext()) {
if (!filter.accept(iter.next().getPath())) {
iter.remove();
}
}
}
// no final paths, if there were any globs return empty list
if (matches.isEmpty()) {
//没有通配符返回null,也正是抛出异常的原因
return patternHasGlob ? matches : null;
}
Collections.sort(matches);
return matches;
}[/mw_shl_code]
发现问题1
这里的路径后面需要跟一个通配符,类似正则表达式,修改执行脚本如下
[mw_shl_code=shell,true]
bin/hadoop jar ~/jar/hadoop_task.jar com.**.hadoop.mr.online.WebChannelPvNews -libjars /home/hadoop/jar/guava-r09.jar,/home/hadoop/jar/mysql-connector-java-5.1.34.jar /hive/uac/dt=2016-08-21/channel=web/pv/* 2016-08-21[/mw_shl_code]
此时打印结果如下
[mw_shl_code=java,true]
hdfs://Hadoop22.howbuy.local:9000/hive/uac/dt=2016-08-21/channel=web/pv/web_pv.27.2016-08-21.0
hdfs://Hadoop22.howbuy.local:9000/hive/uac/dt=2016-08-21/channel=web/pv/web_pv.27.2016-08-21.1
hdfs://Hadoop22.howbuy.local:9000/hive/uac/dt=2016-08-21/channel=web/pv/web_pv.27.2016-08-21.10
hdfs://Hadoop22.howbuy.local:9000/hive/uac/dt=2016-08-21/channel=web/pv/web_pv.27.2016-08-21.11
hdfs://Hadoop22.howbuy.local:9000/hive/uac/dt=2016-08-21/channel=web/pv/web_pv.27.2016-08-21.12
hdfs://Hadoop22.howbuy.local:9000/hive/uac/dt=2016-08-21/channel=web/pv/web_pv.27.2016-08-21.13
hdfs://Hadoop22.howbuy.local:9000/hive/uac/dt=2016-08-21/channel=web/pv/web_pv.27.2016-08-21.14
hdfs://Hadoop22.howbuy.local:9000/hive/uac/dt=2016-08-21/channel=web/pv/web_pv.27.2016-08-21.15
hdfs://Hadoop22.howbuy.local:9000/hive/uac/dt=2016-08-21/channel=web/pv/web_pv.27.2016-08-21.16
hdfs://Hadoop22.howbuy.local:9000/hive/uac/dt=2016-08-21/channel=web/pv/web_pv.27.2016-08-21.17
hdfs://Hadoop22.howbuy.local:9000/hive/uac/dt=2016-08-21/channel=web/pv/web_pv.27.2016-08-21.18
hdfs://Hadoop22.howbuy.local:9000/hive/uac/dt=2016-08-21/channel=web/pv/web_pv.27.2016-08-21.19
hdfs://Hadoop22.howbuy.local:9000/hive/uac/dt=2016-08-21/channel=web/pv/web_pv.27.2016-08-21.2
hdfs://Hadoop22.howbuy.local:9000/hive/uac/dt=2016-08-21/channel=web/pv/web_pv.27.2016-08-21.20
hdfs://Hadoop22.howbuy.local:9000/hive/uac/dt=2016-08-21/channel=web/pv/web_pv.27.2016-08-21.21
hdfs://Hadoop22.howbuy.local:9000/hive/uac/dt=2016-08-21/channel=web/pv/web_pv.27.2016-08-21.22
hdfs://Hadoop22.howbuy.local:9000/hive/uac/dt=2016-08-21/channel=web/pv/web_pv.27.2016-08-21.23
hdfs://Hadoop22.howbuy.local:9000/hive/uac/dt=2016-08-21/channel=web/pv/web_pv.27.2016-08-21.3
hdfs://Hadoop22.howbuy.local:9000/hive/uac/dt=2016-08-21/channel=web/pv/web_pv.27.2016-08-21.4
hdfs://Hadoop22.howbuy.local:9000/hive/uac/dt=2016-08-21/channel=web/pv/web_pv.27.2016-08-21.5
hdfs://Hadoop22.howbuy.local:9000/hive/uac/dt=2016-08-21/channel=web/pv/web_pv.27.2016-08-21.6
hdfs://Hadoop22.howbuy.local:9000/hive/uac/dt=2016-08-21/channel=web/pv/web_pv.27.2016-08-21.7
hdfs://Hadoop22.howbuy.local:9000/hive/uac/dt=2016-08-21/channel=web/pv/web_pv.27.2016-08-21.8
hdfs://Hadoop22.howbuy.local:9000/hive/uac/dt=2016-08-21/channel=web/pv/web_pv.27.2016-08-21.9
hdfs://Hadoop22.howbuy.local:9000/hive/uac/dt=2016-08-21/channel=web/pv/web_pv.28.2016-08-21.0
hdfs://Hadoop22.howbuy.local:9000/hive/uac/dt=2016-08-21/channel=web/pv/web_pv.28.2016-08-21.1
hdfs://Hadoop22.howbuy.local:9000/hive/uac/dt=2016-08-21/channel=web/pv/web_pv.28.2016-08-21.10
hdfs://Hadoop22.howbuy.local:9000/hive/uac/dt=2016-08-21/channel=web/pv/web_pv.28.2016-08-21.11
hdfs://Hadoop22.howbuy.local:9000/hive/uac/dt=2016-08-21/channel=web/pv/web_pv.28.2016-08-21.12
hdfs://Hadoop22.howbuy.local:9000/hive/uac/dt=2016-08-21/channel=web/pv/web_pv.28.2016-08-21.13
hdfs://Hadoop22.howbuy.local:9000/hive/uac/dt=2016-08-21/channel=web/pv/web_pv.28.2016-08-21.14
hdfs://Hadoop22.howbuy.local:9000/hive/uac/dt=2016-08-21/channel=web/pv/web_pv.28.2016-08-21.15
hdfs://Hadoop22.howbuy.local:9000/hive/uac/dt=2016-08-21/channel=web/pv/web_pv.28.2016-08-21.16
hdfs://Hadoop22.howbuy.local:9000/hive/uac/dt=2016-08-21/channel=web/pv/web_pv.28.2016-08-21.17
hdfs://Hadoop22.howbuy.local:9000/hive/uac/dt=2016-08-21/channel=web/pv/web_pv.28.2016-08-21.18
hdfs://Hadoop22.howbuy.local:9000/hive/uac/dt=2016-08-21/channel=web/pv/web_pv.28.2016-08-21.19
hdfs://Hadoop22.howbuy.local:9000/hive/uac/dt=2016-08-21/channel=web/pv/web_pv.28.2016-08-21.2
hdfs://Hadoop22.howbuy.local:9000/hive/uac/dt=2016-08-21/channel=web/pv/web_pv.28.2016-08-21.20
hdfs://Hadoop22.howbuy.local:9000/hive/uac/dt=2016-08-21/channel=web/pv/web_pv.28.2016-08-21.21
hdfs://Hadoop22.howbuy.local:9000/hive/uac/dt=2016-08-21/channel=web/pv/web_pv.28.2016-08-21.22
hdfs://Hadoop22.howbuy.local:9000/hive/uac/dt=2016-08-21/channel=web/pv/web_pv.28.2016-08-21.23
hdfs://Hadoop22.howbuy.local:9000/hive/uac/dt=2016-08-21/channel=web/pv/web_pv.28.2016-08-21.3
hdfs://Hadoop22.howbuy.local:9000/hive/uac/dt=2016-08-21/channel=web/pv/web_pv.28.2016-08-21.4
hdfs://Hadoop22.howbuy.local:9000/hive/uac/dt=2016-08-21/channel=web/pv/web_pv.28.2016-08-21.5
hdfs://Hadoop22.howbuy.local:9000/hive/uac/dt=2016-08-21/channel=web/pv/web_pv.28.2016-08-21.6
hdfs://Hadoop22.howbuy.local:9000/hive/uac/dt=2016-08-21/channel=web/pv/web_pv.28.2016-08-21.7
hdfs://Hadoop22.howbuy.local:9000/hive/uac/dt=2016-08-21/channel=web/pv/web_pv.28.2016-08-21.8
hdfs://Hadoop22.howbuy.local:9000/hive/uac/dt=2016-08-21/channel=web/pv/web_pv.28.2016-08-21.9
16/08/22 10:59:32 INFO mapreduce.JobSubmitter: Cleaning up the staging area /data/hadoop1_data/hadoop/staging/hadoop/.staging/job_1471259823017_0357
16/08/22 10:59:32 ERROR security.UserGroupInformation: PriviledgedActionException as:hadoop (auth:SIMPLE) cause:org.apache.hadoop.mapreduce.lib.input.InvalidInputException: Input Pattern hdfs://Hadoop22.howbuy.local:9000/hive/uac/dt=2016-08-21/channel=web/pv/* matches 0 files
Exception in thread "main" org.apache.hadoop.mapreduce.lib.input.InvalidInputException: Input Pattern hdfs://Hadoop22.howbuy.local:9000/hive/uac/dt=2016-08-21/channel=web/pv/* matches 0 files
at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.listStatus(FileInputFormat.java:243)
at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.getSplits(FileInputFormat.java:269)
at org.apache.hadoop.mapreduce.JobSubmitter.writeNewSplits(JobSubmitter.java:452)
at org.apache.hadoop.mapreduce.JobSubmitter.writeSplits(JobSubmitter.java:469)
at org.apache.hadoop.mapreduce.JobSubmitter.submitJobInternal(JobSubmitter.java:366)
at org.apache.hadoop.mapreduce.Job$11.run(Job.java:1218)
at org.apache.hadoop.mapreduce.Job$11.run(Job.java:1215)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1408)
at org.apache.hadoop.mapreduce.Job.submit(Job.java:1215)
at org.apache.hadoop.mapreduce.Job.waitForCompletion(Job.java:1236)
at com.howbuy.hadoop.mr.online.WebChannelPvNews.run(WebChannelPvNews.java:286)
at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)
at com.howbuy.hadoop.mr.online.WebChannelPvNews.main(WebChannelPvNews.java:293)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.hadoop.util.RunJar.main(RunJar.java:208)
[/mw_shl_code]
仍然抛出异常,但能说明修改生效,结果是matches 0 files,不过我们可以清晰的看到打印出来的路径信息,是完整的路径,完整的路径啊!也不怪正则表达式没法用
发现问题2
pathfilter中的path的值是携带了${fs.defaultFS}的完整路径
想到了解决办法2个
1:在正则前面拼接${fs.defaultFS}的值;
2:直接匹配文件的结尾部分
由于文件名规则基本不会太大改变,所以我选择了方案二来解决,简单(有偷懒嫌疑)。
修改代码
[mw_shl_code=java,true]
@Override
public boolean accept(Path path) {
int hour = getConf().getInt(WEBPV_HOUR_FILTER_REGEX, getPreHour());
String dt = getConf().get("dt");
String regex = dt+"."+hour;
String pathValue = path.toString();
System.out.println(pathValue);
return pathValue.endsWith(regex);//匹配结尾
}
[/mw_shl_code]
重新打包运行,脚本如下
bin/hadoop jar ~/jar/hadoop_task.jar com.**.hadoop.mr.online.WebChannelPvNews -libjars /home/hadoop/jar/guava-r09.jar,/home/hadoop/jar/mysql-connector-java-5.1.34.jar /hive/uac/dt=2016-08-21/channel=web/pv/* 2016-08-21
运行结果 看到了胜利的曙光
[mw_shl_code=java,true]
16/08/22 11:24:25 INFO mapreduce.Job: Running job: job_1471259823017_0362
16/08/22 11:24:31 INFO mapreduce.Job: Job job_1471259823017_0362 running in uber mode : false
16/08/22 11:24:31 INFO mapreduce.Job: map 0% reduce 0%
16/08/22 11:24:36 INFO mapreduce.Job: map 100% reduce 0%
16/08/22 11:24:47 INFO mapreduce.Job: map 100% reduce 100%
16/08/22 11:24:47 INFO mapreduce.Job: Job job_1471259823017_0362 completed successfully
16/08/22 11:24:47 INFO mapreduce.Job: Counters: 47[/mw_shl_code]
问题解决,以上就是我在使用pathfilter过程中遇到的问题,分享给大家,希望有帮助。欢迎拍砖!
原文链接http://blog.csdn.net/charsli/article/details/52274460
|