翻笔记时发现了我的hadoop目录下存着很多东西,今天把源码解析里的部分共享出来,后续待我深入后继续分享
show your knowledge with the world!
==============================
//目标:了解Configuration各成员变量的具体含义,
//Configuration类中的其他部分都是为了操作这些变量而实现的解析、设置、获取方法
//其中资源的加载主要是通过:addResource方法和addDefaultResource静态方法加载的,
// 前者是加载系统默认配置文件之外的资源,后者则是加载系统默认资源如:hdfs-dafault.xml
//hadoop的配置文件都是xml格式的,使用JAXP中的DOM方法处理xml,JAXP有两种处理xml的方法(SAX和DOM)
//因为hadoop的配置文件都比较小所以采用DOM文档树形式处理,SAX适合处理大的xml文件
package org.apache.hadoop.conf;
static{
//print deprecation warning if hadoop-site.xml is found in classpath
ClassLoader cL = Thread.currentThread().getContextClassLoader();
if (cL == null) {
cL = Configuration.class.getClassLoader();
}
if(cL.getResource("hadoop-site.xml")!=null) {
LOG.warn("DEPRECATED: hadoop-site.xml found in the classpath. " +
"Usage of hadoop-site.xml is deprecated. Instead use core-site.xml, "
+ "mapred-site.xml and hdfs-site.xml to override properties of " +
"core-default.xml, mapred-default.xml and hdfs-default.xml " +
"respectively");
}
addDefaultResource("core-default.xml");
addDefaultResource("core-site.xml");
}
//下面两个变量都是加载配置文件相关的
private Properties properties;
private Properties overlay;
//类加载器变量
private ClassLoader classLoader;
{
classLoader = Thread.currentThread().getContextClassLoader();
if (classLoader == null) {
classLoader = Configuration.class.getClassLoader();
}
}
//老版本中addResource方法被重载了3次分别是:InputStream,Path,String,url。新版本中重载了6次
//此处用于加载classpath资源
public void addResource(String name) {
addResourceObject(new Resource(name));
}
public void addResource(Configuration conf) {
addResourceObject(new Resource(conf.getProps()));
}
public void addResource(InputStream in, String name) {
addResourceObject(new Resource(in, name));
}
public void addResource(InputStream in) {
addResourceObject(new Resource(in));
}
public void addResource(Path file) {
addResourceObject(new Resource(file));
}
//加载系统外的资源
public void addResource(URL url) {
addResourceObject(new Resource(url));
}
/**
* Get the {@link URL} for the named resource.
*
* @param name resource name.
* @return the url for the named resource.
*/
//将字符串转换为url,然后加载资源,这里资源指的不是配置文件了,如图像声音文本等,返回读取资源的url对象
public URL getResource(String name) {
return classLoader.getResource(name);
}
//reloadConfiguration方法用于addDefaultResource方法中加载系统默认资源前清空properties和finalParameters
public synchronized void reloadConfiguration() {
properties = null; // trigger reload
finalParameters.clear(); // clear site-limits
}
/**
* List of configuration resources.
*/
private ArrayList<Resource> resources = new ArrayList<Resource>();
//addResourceObject方法也会调用reloadConfiguration方法去清空properties和finalParameters
private synchronized void addResourceObject(Resource resource) {
resources.add(resource); // add to resources
reloadConfiguration();
}
/**
* List of default Resources. Resources are loaded in the order of the list
* entries
*/
//静态成员变量,用于存放配置文件
private static final CopyOnWriteArrayList<String> defaultResources =
new CopyOnWriteArrayList<String>();
//用于在addDefaultResource方法中加载系统默认资源使用,相当于一个标志,是系统默认配置文件才加载
private boolean loadDefaults = true;
/**
* Configuration objects
*/
private static final WeakHashMap<Configuration,Object> REGISTRY =
new WeakHashMap<Configuration,Object>();
/**
* Reload configuration from previously added resources.
*
* This method will clear all the configuration read from the added
* resources, and final parameters. This will make the resources to
* be read again before accessing the values. Values that are added
* via set methods will overlay values read from the resources.
*/
//reloadConfiguration方法用于addDefaultResource方法中加载系统默认资源前清空properties和finalParameters
public synchronized void reloadConfiguration() {
properties = null; // trigger reload
finalParameters.clear(); // clear site-limits
}
/**
* List of default Resources. Resources are loaded in the order of the list
* entries
*/
//静态成员变量,用于存放配置文件
private static final CopyOnWriteArrayList<String> defaultResources =
new CopyOnWriteArrayList<String>();
/**
* Add a default resource. Resources are loaded in the order of the resources
* added.
* @param name file name. File should be present in the classpath.
*/
//加载系统默认资源,在hdfs中,会把hdfs-defaule.xml和hdfs-site.xml作为默认资源
//并通过addDefaultResource方法保存在成员变量defaultResource中
//在mapreuce中,会把mapred-default.xml和mapred-site.xml作为默认资源
//synchronized,该方法在hadoop生命周期中只加载一次
//再次通过defaultResource变量是否已经包含配置文件名称去判断。如果不包含就加载
//还会通过loadDefaults布尔变量去标志配置文件是否是系统默认配置文件,如果是才会加载
//加载资源并不是马上加载,而是调用reloadConfiguration方法清空properties和parametees,其实就是触发资源的重新加载
//资源的重新加载是以静态成员REGISTRY作为媒介进行的,REGISTY静态成员中记录了系统中所有的Configuration对象
public static synchronized void addDefaultResource(String name) {
if(!defaultResources.contains(name)) {
defaultResources.add(name);
for(Configuration conf : REGISTRY.keySet()) {
if(conf.loadDefaults) {
conf.reloadConfiguration();
}
}
}
}
比如HdfsConfiguration类中就使用这个方法加载系统默认的配置文件
package org.apache.hadoop.hdfs;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DeprecatedKeys;
/**
* Adds deprecated keys into the configuration.
*/
@InterfaceAudience.Private
public class HdfsConfiguration extends Configuration {
//加载系统默认资源
static {
addDeprecatedKeys();
// adds the default resources
Configuration.addDefaultResource("hdfs-default.xml");
Configuration.addDefaultResource("hdfs-site.xml");
}
//延迟加载的设计模式:当真正需要配置数据的时候才会解析配置文件
//在addDefaultResource方法中会去清空properties和parameters(均不是静态成员)
protected synchronized Properties getProps() {
if (properties == null) {
properties = new Properties();
Map<String, String[]> backup =
new ConcurrentHashMap<String, String[]>(updatingResource);
loadResources(properties, resources, quietmode);
if (overlay != null) {
properties.putAll(overlay);
for (Map.Entry<Object,Object> item: overlay.entrySet()) {
String key = (String)item.getKey();
String[] source = backup.get(key);
if(source != null) {
updatingResource.put(key, source);
}
}
}
}
return properties;
}
//初始化一个unkown的String name,把resourceName赋值给name
//得到用于创建DOM解析器的工厂
//忽略xml中的注释
//提供对xml名称空间的支持
//允许XIncludeAware机制(包含机制),这个关键,hadoop允许讲xml分解成单个小块,然后合并起来一起处理
//上面就是个工厂然后设置一些DOM属性,下面开始重点
//获取解析xml的DocumentBuilder对象
//根据不同资源,调用DocumentBuilder.parse 如:String Path InputStream Url Path Element Properties
//第二部分代码是去根据DOM解析结果设置properties和finalParamters
private Resource loadResource(Properties properties, Resource wrapper, boolean quiet) {
String name = UNKNOWN_RESOURCE;
try {
Object resource = wrapper.getResource();
name = wrapper.getName();
DocumentBuilderFactory docBuilderFactory
= DocumentBuilderFactory.newInstance();
//ignore all comments inside the xml file
docBuilderFactory.setIgnoringComments(true);
//allow includes in the xml file
docBuilderFactory.setNamespaceAware(true);
try {
docBuilderFactory.setXIncludeAware(true);
} catch (UnsupportedOperationException e) {
LOG.error("Failed to set setXIncludeAware(true) for parser "
+ docBuilderFactory
+ ":" + e,
e);
}
DocumentBuilder builder = docBuilderFactory.newDocumentBuilder();
Document doc = null;
Element root = null;
boolean returnCachedProperties = false;
if (resource instanceof URL) { // an URL resource
doc = parse(builder, (URL)resource);
} else if (resource instanceof String) { // a CLASSPATH resource
URL url = getResource((String)resource);
doc = parse(builder, url);
} else if (resource instanceof Path) { // a file resource
// Can't use FileSystem API or we get an infinite loop
// since FileSystem uses Configuration API. Use java.io.File instead.
File file = new File(((Path)resource).toUri().getPath())
.getAbsoluteFile();
if (file.exists()) {
if (!quiet) {
LOG.debug("parsing File " + file);
}
doc = parse(builder, new BufferedInputStream(
new FileInputStream(file)), ((Path)resource).toString());
}
} else if (resource instanceof InputStream) {
doc = parse(builder, (InputStream) resource, null);
returnCachedProperties = true;
} else if (resource instanceof Properties) {
overlay(properties, (Properties)resource);
} else if (resource instanceof Element) {
root = (Element)resource;
}
//
if (root == null) {
if (doc == null) {
if (quiet) {
return null;
}
throw new RuntimeException(resource + " not found");
}
root = doc.getDocumentElement();
}
Properties toAddTo = properties;
if(returnCachedProperties) {
toAddTo = new Properties();
}
//根节点应该是configuration
if (!"configuration".equals(root.getTagName()))
LOG.fatal("bad conf file: top-level element not <configuration>");
//获取根节点下的所有子节点
//如果子节点不是Element,忽略
NodeList props = root.getChildNodes();
DeprecationContext deprecations = deprecationContext.get();
for (int i = 0; i < props.getLength(); i++) {
Node propNode = props.item(i);
if (!(propNode instanceof Element))
continue;
Element prop = (Element)propNode;
//如果子节点是configuration,递归调用loadResource()方法
//configuration的子节点也可以是configuration
if ("configuration".equals(prop.getTagName())) {
loadResource(toAddTo, new Resource(prop, name), quiet);
continue;
}
//如果子节点是property,
if (!"property".equals(prop.getTagName()))
LOG.warn("bad conf file: element not <property>");
String attr = null;
String value = null;
boolean finalParameter = false;
LinkedList<String> source = new LinkedList<String>();
Attr propAttr = prop.getAttributeNode("name");
if (propAttr != null)
attr = StringInterner.weakIntern(propAttr.getValue());
propAttr = prop.getAttributeNode("value");
if (propAttr != null)
value = StringInterner.weakIntern(propAttr.getValue());
propAttr = prop.getAttributeNode("final");
if (propAttr != null)
finalParameter = "true".equals(propAttr.getValue());
propAttr = prop.getAttributeNode("source");
if (propAttr != null)
source.add(StringInterner.weakIntern(propAttr.getValue()));
//获取所有子节点,查找name,value,final,resource的值
NodeList fields = prop.getChildNodes();
for (int j = 0; j < fields.getLength(); j++) {
Node fieldNode = fields.item(j);
if (!(fieldNode instanceof Element))
continue;
Element field = (Element)fieldNode;
if ("name".equals(field.getTagName()) && field.hasChildNodes())
attr = StringInterner.weakIntern(
((Text)field.getFirstChild()).getData().trim());
if ("value".equals(field.getTagName()) && field.hasChildNodes())
value = StringInterner.weakIntern(
((Text)field.getFirstChild()).getData());
if ("final".equals(field.getTagName()) && field.hasChildNodes())
finalParameter = "true".equals(((Text)field.getFirstChild()).getData());
if ("source".equals(field.getTagName()) && field.hasChildNodes())
source.add(StringInterner.weakIntern(
((Text)field.getFirstChild()).getData()));
}
source.add(name);
//如果属性已经标记为final,则忽略
// Ignore this parameter if it has already been marked as 'final'
if (attr != null) {
if (deprecations.getDeprecatedKeyMap().containsKey(attr)) {
DeprecatedKeyInfo keyInfo =
deprecations.getDeprecatedKeyMap().get(attr);
keyInfo.clearAccessed();
for (String key:keyInfo.newKeys) {
// update new keys with deprecated key's value
loadProperty(toAddTo, name, key, value, finalParameter,
source.toArray(new String[source.size()]));
}
}
else {
loadProperty(toAddTo, name, attr, value, finalParameter,
source.toArray(new String[source.size()]));
}
}
}
if (returnCachedProperties) {
overlay(properties, toAddTo);
return new Resource(toAddTo, name);
}
return null;
} catch (IOException e) {
LOG.fatal("error parsing conf " + name, e);
throw new RuntimeException(e);
} catch (DOMException e) {
LOG.fatal("error parsing conf " + name, e);
throw new RuntimeException(e);
} catch (SAXException e) {
LOG.fatal("error parsing conf " + name, e);
throw new RuntimeException(e);
} catch (ParserConfigurationException e) {
LOG.fatal("error parsing conf " + name , e);
throw new RuntimeException(e);
}
}
//get方法会调用substitueVars方法,该方法会完成而配置的属性扩展,其实就是正则解析,比如value中可以传入$name
//而这个$name又是源于另一property的name,进行字符串替换
public String get(String name) {
String[] names = handleDeprecation(deprecationContext.get(), name);
String result = null;
for(String n : names) {
result = substituteVars(getProps().getProperty(n));
}
return result;
}
//substituteVars方法中进行属性扩展,解析value中的变量次数,默认可以循环20次
private static final int MAX_SUBST = 20;
/**
* Attempts to repeatedly expand the value {@code expr} by replacing the
* left-most substring of the form "${var}" in the following precedence order
* <ol>
* <li>by the value of the environment variable "var" if defined</li>
* <li>by the value of the Java system property "var" if defined</li>
* <li>by the value of the configuration key "var" if defined</li>
* </ol>
*
* If var is unbounded the current state of expansion "prefix${var}suffix" is
* returned.
* <p>
* This function also detects self-referential substitutions, i.e.
* <pre>
* {@code
* foo.bar = ${foo.bar}
* }
* </pre>
* If a cycle is detected then the original expr is returned. Loops
* involving multiple substitutions are not detected.
*
* @param expr the literal value of a config key
* @return null if expr is null, otherwise the value resulting from expanding
* expr using the algorithm above.
* @throws IllegalArgumentException when more than
* {@link Configuration#MAX_SUBST} replacements are required
*/
//属性扩展,老版本是正则匹配的方法,现在直接上字符串了
private String substituteVars(String expr) {
if (expr == null) {
return null;
}
String eval = expr;
for(int s = 0; s < MAX_SUBST; s++) {
final int[] varBounds = findSubVariable(eval);
if (varBounds[SUB_START_IDX] == -1) {
return eval;
}
final String var = eval.substring(varBounds[SUB_START_IDX],
varBounds[SUB_END_IDX]);
String val = null;
try {
if (var.startsWith("env.") && 4 < var.length()) {
String v = var.substring(4);
int i = 0;
for (; i < v.length(); i++) {
char c = v.charAt(i);
if (c == ':' && i < v.length() - 1 && v.charAt(i + 1) == '-') {
val = getenv(v.substring(0, i)); //获取属性扩展的键
if (val == null || val.length() == 0) {
val = v.substring(i + 2);
}
break;
} else if (c == '-') {
val = getenv(v.substring(0, i));
if (val == null) {
val = v.substring(i + 1);
}
break;
}
}
if (i == v.length()) {
val = getenv(v);
}
} else {
val = getProperty(var);
}
} catch(SecurityException se) {
LOG.warn("Unexpected SecurityException in Configuration", se);
}
if (val == null) {
val = getRaw(var);
}
if (val == null) {
return eval; // return literal ${var}: var is unbound
}
final int dollar = varBounds[SUB_START_IDX] - "${".length();
final int afterRightBrace = varBounds[SUB_END_IDX] + "}".length();
final String refVar = eval.substring(dollar, afterRightBrace);
// detect self-referential values
if (val.contains(refVar)) {
return expr; // return original expression if there is a loop
}
// substitute
eval = eval.substring(0, dollar)
+ val
+ eval.substring(afterRightBrace);
}
throw new IllegalStateException("Variable substitution depth too large: "
+ MAX_SUBST + " " + expr);
}
//substituteVars方法中,调用了getProperty这个方法,这一步保证了首先使用系统属性做属性扩展
//比如在编译hadoop源码时可以这么来:-Dname=value 这个属于系统级,优先级别最高
String getProperty(String key) {
return System.getProperty(key);
}
================================接口
//接口,谁实现谁给答案,进行一些基于Configuration实例进一步初始化对象
/** Something that may be configured with a {@link Configuration}. */
@InterfaceAudience.Public
@InterfaceStability.Stable
public interface Configurable {
/** Set the configuration to be used by this object. */
void setConf(Configuration conf);
/** Return the configuration used by this object. */
Configuration getConf();
}
================================总结
hadoop的配置文件管理采用的是JAXP的DOM解析xml
其实配置文件管理的类里(Configuration),就是get* 和 set*
get*会比较复杂,set*会简单很多,就是对一些像properties,overlay,finalParamters等成员变量的setProperty(),保存传入的键值对
配置系统是复杂软件必不可少的部分,其中资源加载,资源合并,和属性扩展等都是比较重要的处理过程
|
|