分享

Flume(ng) 自定义sink实现和属性注入

问题导读:
1.如何实现flume端自定一个sink,来按照我们的规则来保存日志?
2.想从flume的配置文件中获取rootPath的值,该如何配置?






最近需要利用flume来做收集远端日志,所以学习一些flume最基本的用法。这里仅作记录。

远端日志收集的整体思路是远端自定义实现log4j的appender把消息发送到flume端,flume端自定义实现一个sink来按照我们的规则保存日志。

自定义Sink代码:

  1. public class LocalFileLogSink extends AbstractSink implements Configurable {
  2.      private static final Logger logger = LoggerFactory
  3.               . getLogger(LocalFileLogSink .class );
  4.             private static final String PROP_KEY_ROOTPATH = "rootPath";
  5.       private String rootPath;
  6.      @Override
  7.      public void configure(Context context) {
  8.           String rootPath = context.getString(PROP_KEY_ROOTPATH );
  9.           setRootPath(rootPath);
  10.      }
  11.            
  12.           @Override
  13.           public Status process() throws EventDeliveryException {
  14.            logger .debug("Do process" );
  15.        }
  16. }
复制代码
实现Configurable接口,即可在初始化时,通过configure方法从context中获取配置的参数的值。这里,我们是想从flume的配置文件中获取rootPath的值,也就是日志保存的根路径。在flume-conf.properties中配置如下:
  1. agent.sinks = loggerSink
  2. agent.sinks.loggerSink.rootPath = ./logs
复制代码

loggerSink是自定义sink的名称,我们取值时的key,只需要loggerSink后面的部分即可,即这里的rootPath。

实际业务逻辑的执行,是通过继承复写AbstractSink中的process方法实现。从基类的getChannel方法中获取信道,从中取出Event处理即可。

  1. Channel ch = getChannel();
  2.            Transaction txn = ch.getTransaction();
  3.          txn.begin();
  4.           try {
  5.               logger .debug("Get event." );
  6.              Event event = ch.take();
  7.              txn.commit();
  8.              status = Status. READY ;
  9.              return status;
  10.                    }finally {
  11.              Log. info( "trx close.");
  12.              txn.close();
  13.          }
复制代码










已有(6)人评论

跳转到指定楼层
以梦为马 发表于 2014-8-14 18:03:11
在配置文件中如何确定执行的是你写的sink啊?????
回复

使用道具 举报

JustEnding 发表于 2015-1-6 17:25:27
请问自定义sink的打包和导入过程是怎么样的?
回复

使用道具 举报

nextuser 发表于 2015-1-6 18:30:32
JustEnding 发表于 2015-1-6 17:25
请问自定义sink的打包和导入过程是怎么样的?
这个可能对你有所帮助
flume 插件开发介绍

回复

使用道具 举报

JustEnding 发表于 2015-1-6 19:52:23
nextuser 发表于 2015-1-6 18:30
这个可能对你有所帮助
flume 插件开发介绍

好的,谢谢
另外,我想知道在linux中,flume-ng用到的链接库jar包,是用mvn打包的,那么如何打包呢?为什么我一直出错呢
回复

使用道具 举报

tntzbzc 发表于 2015-1-6 20:14:58
JustEnding 发表于 2015-1-6 19:52
好的,谢谢
另外,我想知道在linux中,flume-ng用到的链接库jar包,是用mvn打包的,那么如何打包呢?为 ...
使用maven的,参考这个
Flume-ng生产环境实践(一)Flume-ng生产环境编译
回复

使用道具 举报

dengrijie 发表于 2016-12-16 15:42:51
不太懂,学习中。。
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条