分享

Flume-NG启动过程源码分析(一)(原创)

xioaxu790 2014-7-16 09:10:32 发表于 连载型 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 1 11483
问题导读

1、Flume起始于什么类?
2、no-reload-conf参数,决定采用的加载配置文件方式有哪几种?



  从bin/flume 这个shell脚本可以看到Flume的起始于org.apache.flume.node.Application类,这是flume的main函数所在。

  main方法首先会先解析shell命令,如果指定的配置文件不存在就甩出异常。

  根据命令中含有"no-reload-conf"参数,决定采用那种加载配置文件方式:一、没有此参数,会动态加载配置文件,默认每30秒加载一次配置文件,因此可以动态修改配置文件;二、有此参数,则只在启动时加载一次配置文件。实现动态加载功能采用了发布订阅模式,使用guava中的EventBus实现。
 
  1. EventBus eventBus = new EventBus(agentName + "-event-bus");
  2.         PollingPropertiesFileConfigurationProvider configurationProvider =
  3.             new PollingPropertiesFileConfigurationProvider(agentName,
  4.                 configurationFile, eventBus, 30); //这里是发布事件的类,这里的30则是动态加载配置文件时间间隔,单位是s
  5.         components.add(configurationProvider);
  6.         application = new Application(components);
  7.         eventBus.register(application);    //将订阅类注册到Bus中
复制代码

  订阅类是application = new Application(components);发布代码在PollingPropertiesFileConfigurationProvider中的FileWatcherRunnable.run方法中。在这只是先构建一个PollingPropertiesFileConfigurationProvider对象,PollingPropertiesFileConfigurationProvider extends  PropertiesFileConfigurationProvider implements LifecycleAware,继续跟踪PropertiesFileConfigurationProvider extends AbstractConfigurationProvider,再跟踪AbstractConfigurationProvider implements  ConfigurationProvider可以看到这些类的构造方法都是初始化,AbstractConfigurationProvid的构造方法初始化了sink、channel、source的工厂类。

  Application.handleConfigurationEvent(MaterializedConfiguration conf)有@Subscribe注解,是订阅方法,当eventBus.post(MaterializedConfiguration conf)执行时,会触发执行handleConfigurationEvent方法。

  new Application(components)时,会构建一个对象supervisor = new LifecycleSupervisor()会启动10个线程用来执行配置文件中的各个组件,并监控组件的整个运行过程。

  application.start()方法会启动配置文件的加载过程supervisor.supervise(component, new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START); //LifecycleState.START开始运行,在这的component就是上面的PollingPropertiesFileConfigurationProvider对象。supervise方法会对component创建一个MonitorRunnable进程,并放入默认有10个线程的monitorService去执行
  1.    Supervisoree process = new Supervisoree();
  2.     process.status = new Status();
  3.     process.policy = policy;
  4.     process.status.desiredState = desiredState;
  5.     process.status.error = false;
  6.     MonitorRunnable monitorRunnable = new MonitorRunnable();
  7.     monitorRunnable.lifecycleAware = lifecycleAware;//组件
  8.     monitorRunnable.supervisoree = process;
  9.     monitorRunnable.monitorService = monitorService;
  10.     supervisedProcesses.put(lifecycleAware, process);
复制代码

    //创建并执行一个在给定初始延迟后首次启用的定期操作,随后,在每一次执行终止和下一次执行开始之间都存在给定的延迟。如果任务的任一执行遇到异常,就会取消后续执行。
  1.   ScheduledFuture<?> future = monitorService.scheduleWithFixedDelay(
  2.         monitorRunnable, 0, 3, TimeUnit.SECONDS);  //启动MonitorRunnable,结束之后3秒再重新启动,可以用于重试
  3.     monitorFutures.put(lifecycleAware, future);
复制代码

  看MonitorRunnable类,其run方法主要是根据supervisoree.status.desiredState的值执行对应的操作。这里的lifecycleAware就是上面supervise方法中的component,lifecycleAware在构造之初将lifecycleState=IDLE,application.start()方法通过supervisor.supervise方法将supervisoree.status.desiredState=START。所以在run方法中会执行lifecycleAware.start(),也就是PollingPropertiesFileConfigurationProvider.start()方法。

  PollingPropertiesFileConfigurationProvider.start()方法会启动一个单线程FileWatcherRunnable每隔30s去加载一次配置文件(如果配置文件有修改):eventBus.post(getConfiguration())。getConfiguration()是AbstractConfigurationProvider.getConfiguration()这个方法解析了配置文件获取了所有组件及其配置属性。这个方法较为复杂,放在后续再讲解。

  待eventBus.post(getConfiguration())之后会触发Application.handleConfigurationEvent方法:
  1.   @Subscribe
  2.   public synchronized void handleConfigurationEvent(MaterializedConfiguration conf) {
  3.     stopAllComponents();
  4.     startAllComponents(conf);
  5.   }
复制代码

  stopAllComponents()方法会依次stop各个组件的运行,顺序是:source、sink、channel。之所以有顺序是因为:一、source是不停的读数据放入channel的;二、sink是不停的从channel拿数据的,channel两头都在使用应该最后停止,停止向channel发送数据后sink停止才不会丢数据。stop是通过supervisor.unsupervise方法来完成的。

  startAllComponents(conf)是启动各个组件的,顺序正好和stopAllComponents()停止顺序相反,相信大伙很容易理解。是通过supervisor.supervise启动组件的。另外需要注意的是启动channel组件后需要等待一定时间,是为了让所有channel全部启动。

  另外为什么要先stop再start呢?因为考虑到要动态加载配置文件啊,加载配置文件后就需要重新启动所有组件,所以先停止所有的,再重新启动所有的。

  main方法的最后还有一个钩子函数Runtime.getRuntime().addShutdownHook,主要是用来进行内存清理、对象销毁等操作。




已有(1)人评论

跳转到指定楼层
lbwahoo 发表于 2014-7-17 21:30:31
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条