本帖最后由 desehawk 于 2017-2-27 17:34 编辑
问题导读
1.如何配置maven获取源码?
2.本文认为flume启动包含哪两个步骤?
3.源码如何实现获取启动配置的?
1源码编译
1 说明
Flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统,目前已经是Apache的一个子项目。Flume是一个专用工具被设计为旨在往HDFS、HBase发送数据。它对HDFS有特殊的优化,并且集成了Hadoop的安全特性。本文将详细分析Flume核心模块的源码实现。
2 下载、编译
2.1 源码检出
Flume源码的Git地址是:https://git-wip-us.apache.org/repos/asf/flume.git。本文采用的分支是flume-1.7。
2.2 源码编译
Flume采用Maven进行构建。本文采用IDEA作为编译、调试工具。
2.2.1 Maven依赖下载
配置阿里云的Maven服务器,速度较快,并可完成绝大多数包的下载。
[mw_shl_code=bash,true]<mirror>
<id>nexus-aliyun</id>
<mirrorOf>*</mirrorOf>
<name>Nexus aliyun</name>
<url>http://maven.aliyun.com/nexus/content/groups/public</url>
</mirror>[/mw_shl_code]
使用阿里云Maven服务器,仍然有一些包无法解决,例如FlumeAuth模块的hadoop-minikdc和SolrSink的kite-morphlines-all等。由于这部分缺失的包,基本都是属于flume的插件模块之中,可直接关闭这些插件模块,不会影响主程序的执行和源码阅读。同时,还可移除其他的你不需要使用的source和sink和channel插件模块,可加快编译速度。 导入好的项目如下图所示:
其中比较重要的就是上图圈起来的模块了,其中flume-ng-core存放了最核心部分的代码,包含基础的Source、Channel、Sink等;flume-ng-node则是存放了程序启动的代码(入口函数)。 其他可能会用到的模块就是flume-ng-sources、flume-ng-channels、flume-ng-sinks,这3个模块存放了非必须的flume组件(flume-ng-core中未包含的),里面有些组件也是很常用的。 2.2.2 编译打包使用mvn clean install -Dmaven.test.skip=true进行打包,看到BUILD SUCCESS则代表编译成功,可进行下一步。 本步骤一定要有,不然debug的时候会报某些类找不到,这些类大多数是由avro定义的文件编译后才会生成的class文件。 2.2.3 简单测试在TestMemoryChannel类右键选择Run ‘TestMemoryChannel’运行JUnit单元测试。检查是否有报错。
2整体架构
Flume有三大组件:Source、Channel、Sink。 - Source就是数据来源,例如Web Server产生日志后,可使用ExecSource执行tail -F命令后不断监听日志文件新生成的数据,然后传给Channel。
- Channel就是一个缓存队列,由于读取数据和写入数据的速度可能不匹配,假如用同步完成的方式可能效率低下,所以Source把数据写到Channel这个队列里面,Sink再用另外的线程去读取。
- Sink就是最终的存储,例如可以是HDFS或LOG文件输出等,Sink负责去Channel里面读取数据,并存储。
在程序启动时,会启动所有的SourceRunner、Channel、SinkRunner。其中Channel的启动,没做什么特别的事情,就是初始化一下状态、创建一下计数器,算做一个被动的角色。比较重要的是SourceRunner和SinkRunner。 - SourceRunner会调用Source的start方法。以ExecSource为例,其start方法就是启动一个线程,去不断获取标准输出流写入一个列表(eventList),同时再启动一个线程去定期批量地把列表中的数据往Channel发,如下图所示。
- SinkRunner则是不断循环调用SinkProcess的process的方法,SinkProcess有几种类型,用于决定选择哪个Sink进行存储(Sink可以有多个),选择了Sink后,调用其process方法。Sink的process方法,主要做的就是去Channel中读取数据,并写入对应的存储,如下图所示。
3 程序入口
启动Flume的过程可以简单分为2个步骤:
1. 获取相关配置文件(一般来说就是flume-conf.properties)。
2. 启动各组件。不特别说明,本文中的组件是指实现了LifecycleAware接口的类的对象,一般就是Source、Channel、Sink这3种对象。
3.1 获取启动配置
3.1.1 Main函数
启动Flume的Main函数在flume-ng-node模块的org.apache.flume.node.Application。该函数的功能可以简单划分为以下三个步骤:
1. 使用commons.cli类获取命令行参数(就是启动时传入的参数)
2. 根据启动参数确定的读取配置的方式。读取配置的方式总共有4种,分别根据配置是保存在zookeeper上还是本地properties文件、以及是否reload(自动重载配置文件)分为4种方式。
3. 根据相应的配置启动程序,并注册关闭钩子。
接下来以properties文件、不重载的方式为例,主要的代码如下:
[mw_shl_code=java,true]PropertiesFileConfigurationProvider configurationProvider =
new PropertiesFileConfigurationProvider(agentName, configurationFile);
//创建Application对象,包含初始化组件列表(components),初始化LifecycleSupervisor。
application = new Application();
application.handleConfigurationEvent(configurationProvider.getConfiguration());
//start方法用于检查所有组件是否是启动状态,如果不是则启动该组件。
application.start();
//监听程序关闭事件,用于当程序被kill后能够执行一些清理工作。
final Application appReference = application;
Runtime.getRuntime().addShutdownHook(new Thread("agent-shutdown-hook") {
public void run() {
appReference.stop();
}
});[/mw_shl_code]
上面的代码,有两处比较关键:
configurationProvider.getConfiguration()会返回一个MaterializedConfiguration类型的对象,用于从文件形式的配置转为物化的配置,即包含实际的channel、sinkRunner等对象的实例,在“物化配置”一节分析。
handleConfigurationEvent用于停止所有components,并使用新的配置进行启动,在“使用新配置重启”一节分析。
4.1.2 物化配置
configurationProvider.getConfiguration()方法主要做了以下两件事:
1. 读取配置文件(flume-conf.properties),保存在AgentConfiguration对象中。
[mw_shl_code=java,true]public static class AgentConfiguration {
private final String agentName;
private String sources;
private String sinks;
private String channels;
private String sinkgroups;
private final Map<String, ComponentConfiguration> sourceConfigMap;
private final Map<String, ComponentConfiguration> sinkConfigMap;
private final Map<String, ComponentConfiguration> channelConfigMap;
private final Map<String, ComponentConfiguration> sinkgroupConfigMap;
private Map<String, Context> sourceContextMap;
private Map<String, Context> sinkContextMap;
private Map<String, Context> channelContextMap;
private Map<String, Context> sinkGroupContextMap;
private Set<String> sinkSet;
private Set<String> sourceSet;
private Set<String> channelSet;
private Set<String> sinkgroupSet;
}[/mw_shl_code]
到这个步骤还仅仅是做好了分类的文本形式的配置项。
2. 创建出配置中的各组件实例,并添加到MaterializedConfiguration实例中。
[mw_shl_code=java,true]public interface MaterializedConfiguration {
public void addSourceRunner(String name, SourceRunner sourceRunner);
public void addSinkRunner(String name, SinkRunner sinkRunner);
public void addChannel(String name, Channel channel);
public ImmutableMap<String, SourceRunner> getSourceRunners();
public ImmutableMap<String, SinkRunner> getSinkRunners();
public ImmutableMap<String, Channel> getChannels();
}[/mw_shl_code]
在这个实例中,可以获取配置文件中配置的所有的source、channel、sink,并且是“物化”的,即可以直接取得相关组件的实例。
4.2 启动所有组件
4.2.1 使用新配置重启
有了上面的MaterializedConfiguration实例,我们就可以启动组件了。
在handleConfigurationEvent方法中,首先会停止所有组件,然后再启动所有组件。
[mw_shl_code=java,true]stopAllComponents();
startAllComponents(conf); //这里的conf就是上节的MaterializedConfiguration。[/mw_shl_code]
在startAllComponents方法中,会遍历组件列表(SourceRunners、SinkRunners、Channels),分别调用supervise方法。以Channel为例:
[mw_shl_code=java,true]for (Entry<String, Channel> entry :
materializedConfiguration.getChannels().entrySet()) {
try {
logger.info("Starting Channel " + entry.getKey());
supervisor.supervise(entry.getValue(),
new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
} catch (Exception e) {
logger.error("Error while starting {}", entry.getValue(), e);
}
}[/mw_shl_code]
这个supervise方法简单来说,就是将相应组件的状态转化为期望的状态。例如上面代码中的LifecycleState.START就是期望的状态。
4.2.2 LifecycleSupervisor
上节的supervisor是一个LifecycleSupervisor对象。前面有说到,在创建Application的时候初始化了一个LifecycleSupervisor对象,就是这里的supervisor。这个对象,我理解为各组件生命周期的管理者,用于实时监控所有组件的状态,如果不是期望的状态(desiredState),则进行状态转换。
上节的代码中调用了supervisor.supervise方法,接下来分析一下supervise这个方法:
[mw_shl_code=java,true]public synchronized void supervise(LifecycleAware lifecycleAware,
SupervisorPolicy policy, LifecycleState desiredState) {
//省略状态检查的代码
Supervisoree process = new Supervisoree();
process.status = new Status();
process.policy = policy;
process.status.desiredState = desiredState;
process.status.error = false;
MonitorRunnable monitorRunnable = new MonitorRunnable();
monitorRunnable.lifecycleAware = lifecycleAware;
monitorRunnable.supervisoree = process;
monitorRunnable.monitorService = monitorService;
supervisedProcesses.put(lifecycleAware, process);
ScheduledFuture<?> future = monitorService.scheduleWithFixedDelay(
monitorRunnable, 0, 3, TimeUnit.SECONDS);
monitorFutures.put(lifecycleAware, future);
}[/mw_shl_code]
由于所有的组件都实现了LifecycleAware接口,所以这里的supervise方法传入的是LifecycleAware接口的对象。
可以看到创建了一个Supervisoree对象,顾名思义,就是被监控的的对象,该对象有以下几种状态:IDLE, START, STOP, ERROR。
scheduleWithFixedDelay每隔3秒触发一次监控任务(monitorRunnable)。
4.2.3 MonitorRunnable
在MonitorRunnable中主要是检查组件的状态,并实现从lifecycleState到desiredState的转变。
[mw_shl_code=java,true]switch (supervisoree.status.desiredState) {
case START:
try {
lifecycleAware.start();
} catch (Throwable e) {省略}
break;
case STOP:
try {
lifecycleAware.stop();
} catch (Throwable e) {省略}
break;
default:
logger.warn("I refuse to acknowledge {} as a desired state", supervisoree.status.desiredState);
}[/mw_shl_code]
到这里为止,可以看到监控的进程,调用了组件自己的start和stop方法来启动、停止。前面有提到有3种类型的组件,SourceRunner、Channel、SinkRunner,而Channel的start只做了初始化计数器,没什么实质内容,所以接下来从SourceRunner的启动(从Source写数据到Channel)和SinkRunner的启动(从Channel获取数据写入Sink)来展开说明。
作者:Lnho
来自:csdn
|