问题导读
1.hadoop有哪些内置计数器?
2.job.getCounters()可以得到什么?
3.MapReduce是否允许用户自定义计数器?
简述:Hadoop计数器:可以让开发人员以全局的视角来审查相关作业的运行情况以及各项指标,及时做出错误诊断并进行相应处理。
相比而言,计数器方式比日志更易于分析。 内置计数器:(1)Hadoop内置的计数器,主要用来记录作业的执行情况
(2)内置计数器包括如下:
—MapReduce框架计数器(Map-Reduce Framework)
—文件系统计数器(File System Counters)
—作业计数器(Job Counters)
—文件输入格式计数器(File Output Format Counters)
—文件输出格式计数器(File Input Format Counters)
—Shuffle 错误计数器(Shuffle Errors)
(3)计数器由相关的task进行维护,定期传递给tasktracker,再由tasktracker传给jobtracker;
(4)最终的作业计数器实际上是由jobtracker维护,所以计数器可以被全局汇总,同时也不必在整个网络中传递。
(5)只有当一个作业执行成功后,最终的计数器的值才是完整可靠的; [mw_shl_code=bash,true]内置计数器:
15/06/15 08:46:47 INFO mapreduce.Job: Job job_1434248323399_0004 completed successfully
15/06/15 08:46:47 INFO mapreduce.Job: Counters: 49
File System Counters
FILE: Number of bytes read=103
FILE: Number of bytes written=315873
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=116
HDFS: Number of bytes written=40
HDFS: Number of read operations=9
HDFS: Number of large read operations=0
HDFS: Number of write operations=4
Job Counters
Launched map tasks=1
Launched reduce tasks=2
Data-local map tasks=1
Total time spent by all maps in occupied slots (ms)=2893
Total time spent by all reduces in occupied slots (ms)=6453
Total time spent by all map tasks (ms)=2893
Total time spent by all reduce tasks (ms)=6453
Total vcore-seconds taken by all map tasks=2893
Total vcore-seconds taken by all reduce tasks=6453
Total megabyte-seconds taken by all map tasks=2962432
Total megabyte-seconds taken by all reduce tasks=6607872
Map-Reduce Framework
Map input records=7
Map output records=7
Map output bytes=77
Map output materialized bytes=103
Input split bytes=95
Combine input records=0
Combine output records=0
Reduce input groups=2
Reduce shuffle bytes=103
Reduce input records=7
Reduce output records=2
Spilled Records=14
Shuffled Maps =2
Failed Shuffles=0
Merged Map outputs=2
GC time elapsed (ms)=59
CPU time spent (ms)=3600
Physical memory (bytes) snapshot=606015488
Virtual memory (bytes) snapshot=2672865280
Total committed heap usage (bytes)=602996736
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters
Bytes Read=21
File Output Format Counters
Bytes Written=40[/mw_shl_code]
计数器使用:1、Web UI进行查看
(注:要启动历史服务器)
2、命令行方式:
hadoop job -counter(Hadoop2.x无效) 3、使用Hadoop API
通过job.getCounters()得到Counters,而后调用counters.findCounter()方法去得到计数器对象;查看最终的计数器的值需要等作业完成之后。 自定义计数器及实例:MapReduce允许用户自定义计数器,计数器是一个全局变量,计数器有组的概念,可以用Java的枚举类型或者用字符串来定义方法; [mw_shl_code=java,true]package org.apache.hadoop.mapreduce;
public interface TaskAttemptContext extends JobContext, Progressable {
//Get the {@link Counter} for the given
//<code>counterName</code>.
public Counter getCounter(Enum<?> counterName);
//Get the {@link Counter} for the given
//<code>groupName</code> and <code>counterName</code>.
public Counter getCounter(String groupName, String counterName);
}[/mw_shl_code] 字符串方式(动态计数器)比枚举类型要更加灵活,可以动态在一个组下面添加多个计数器;在旧API中使用Reporter,而新API用context.getCounter(groupName,counterName)来获取计数器配置并设置;然后让计数器递增。 [mw_shl_code=java,true]package org.apache.hadoop.mapreduce;
/**
* A named counter that tracks the progress of a map/reduce job.
* <p><code>Counters</code> represent global counters, defined either by the
* Map-Reduce framework or applications. Each <code>Counter</code> is named by
* an {@link Enum} and has a long for the value.</p>
* <p><code>Counters</code> are bunched into Groups, each comprising of
* counters from a particular <code>Enum</code> class.
*/
public interface Counter extends Writable {
/**
* Increment this counter by the given value
* @param incr the value to increase this counter by
*/
void increment(long incr);
}[/mw_shl_code] 自定义计数器实例
统计词汇行中词汇数超过2个或少于2个的行数:
输入数据文件counter [mw_shl_code=bash,true][root@liguodong file]# vi counter
[root@liguodong file]# hdfs dfs -put counter /counter
[root@liguodong file]# hdfs dfs -cat /counter
hello world
hello hadoop
hi baby
hello 4325 7785993
java hadoop
come[/mw_shl_code] [mw_shl_code=java,true]package MyCounter;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import MyPartitioner.MyPartitioner;
import MyPartitioner.MyPartitioner.DefPartitioner;
import MyPartitioner.MyPartitioner.MyMapper;
import MyPartitioner.MyPartitioner.MyReducer;
public class MyCounter {
private final static String INPUT_PATH = "hdfs://liguodong:8020/counter";
private final static String OUTPUT_PATH = "hdfs://liguodong:8020/outputcounter";
public static class MyMapper extends Mapper<LongWritable, Text, LongWritable, Text>
{
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException
{
String[] val = value.toString().split("\\s+");
if(val.length<2){
context.getCounter("ErrorCounter","below_2").increment(1);
}else if(val.length>2){
context.getCounter("ErrorCounter", "above_2").increment(1);
}
context.write(key, value);
}
}
public static void main(String[] args) throws IllegalArgumentException, IOException,
URISyntaxException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH),conf);
if(fileSystem.exists(new Path(OUTPUT_PATH)))
{
fileSystem.delete(new Path(OUTPUT_PATH),true);
}
Job job = Job.getInstance(conf, "define counter");
job.setJarByClass(MyPartitioner.class);
FileInputFormat.addInputPath(job, new Path(INPUT_PATH));
job.setMapperClass(MyMapper.class);
job.setNumReduceTasks(0);
FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH));
//提交作业
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}[/mw_shl_code] [mw_shl_code=bash,true]运行结果:
[main] INFO org.apache.hadoop.mapreduce.Job - Counters: 25
File System Counters
FILE: Number of bytes read=148
FILE: Number of bytes written=187834
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=69
HDFS: Number of bytes written=86
HDFS: Number of read operations=8
HDFS: Number of large read operations=0
HDFS: Number of write operations=3
Map-Reduce Framework
Map input records=6
Map output records=6
Input split bytes=94
Spilled Records=0
Failed Shuffles=0
Merged Map outputs=0
GC time elapsed (ms)=12
CPU time spent (ms)=0
Physical memory (bytes) snapshot=0
Virtual memory (bytes) snapshot=0
Total committed heap usage (bytes)=16252928
ErrorCounter
above_2=1
below_2=1
File Input Format Counters
Bytes Read=69
File Output Format Counters
Bytes Written=86[/mw_shl_code]
|