分享

flume学习(三):Flume Interceptors的使用

xioaxu790 2014-10-13 20:02:50 发表于 连载型 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 6 34966
问题导读
1、如何理解flume拦截器?
2、如何使用regex_filter和 timestamp这两个拦截器来实现一个较强的功能?
3、怎样为source1添加了两个拦截器?






对于flume拦截器,我的理解是:在app(应用程序日志)和 source 之间的,对app日志进行拦截处理的。也即在日志进入到source之前,对日志进行一些包装、清新过滤等等动作。

官方上提供的已有的拦截器有:
  1. Timestamp Interceptor
  2. Host Interceptor
  3. Static Interceptor
  4. Regex Filtering Interceptor
  5. Regex Extractor Interceptor
复制代码



像很多java的开源项目如springmvc中的拦截器一样,flume的拦截器也是chain形式的,可以对一个source指定多个拦截器,按先后顺序依次处理。
Timestamp Interceptor :在event的header中添加一个key叫:timestamp,value为当前的时间戳。这个拦截器在sink为hdfs 时很有用,后面会举例说到
Host Interceptor:在event的header中添加一个key叫:host,value为当前机器的hostname或者ip。
Static Interceptor:可以在event的header中添加自定义的key和value。
Regex Filtering Interceptor:通过正则来清洗或包含匹配的events。
Regex Extractor Interceptor:通过正则表达式来在header中添加指定的key,value则为正则匹配的部分

下面举例说明这些拦截器的用法,首先我们调整一下第一篇文章中的那个WriteLog类:
  1. public class WriteLog {  
  2.     protected static final Log logger = LogFactory.getLog(WriteLog.class);  
  3.   
  4.     /**
  5.      * @param args
  6.      * @throws InterruptedException
  7.      */  
  8.     public static void main(String[] args) throws InterruptedException {  
  9.         // TODO Auto-generated method stub  
  10.         while (true) {  
  11.             logger.info(new Date().getTime());  
  12.             logger.info("{"requestTime":"  
  13.                     + System.currentTimeMillis()  
  14.                     + ","requestParams":{"timestamp":1405499314238,"phone":"02038824941","cardName":"测试商家名称","provinceCode":"440000","cityCode":"440106"},"requestUrl":"/reporter-api/reporter/reporter12/init.do"}");  
  15.             Thread.sleep(2000);  
  16.   
  17.         }  
  18.     }  
  19. }  
复制代码



又多输出了一行日志信息,现在每次循环都会输出两行日志信息,第一行是一个时间戳信息,第二行是一行JSON格式的字符串信息。

接下来我们用regex_filter和 timestamp这两个拦截器来实现这样一个功能:
1 过滤掉LOG4J输出的第一行那个时间戳日志信息,只收集JSON格式的日志信息
2 将收集的日志信息保存到HDFS上,每天的日志保存到以该天命名的目录下面,如2014-7-25号的日志,保存到/flume/events/14-07-25目录下面。

修改后的flume.conf如下:
  1. tier1.sources=source1  
  2. tier1.channels=channel1  
  3. tier1.sinks=sink1  
  4.   
  5. tier1.sources.source1.type=avro  
  6. tier1.sources.source1.bind=0.0.0.0  
  7. tier1.sources.source1.port=44444  
  8. tier1.sources.source1.channels=channel1  
  9.   
  10. tier1.sources.source1.interceptors=i1 i2  
  11. tier1.sources.source1.interceptors.i1.type=regex_filter  
  12. tier1.sources.source1.interceptors.i1.regex=\\{.*\\}  
  13. tier1.sources.source1.interceptors.i2.type=timestamp  
  14.   
  15. tier1.channels.channel1.type=memory  
  16. tier1.channels.channel1.capacity=10000  
  17. tier1.channels.channel1.transactionCapacity=1000  
  18. tier1.channels.channel1.keep-alive=30  
  19.   
  20. tier1.sinks.sink1.type=hdfs  
  21. tier1.sinks.sink1.channel=channel1  
  22. tier1.sinks.sink1.hdfs.path=hdfs://master68:8020/flume/events/%y-%m-%d  
  23. tier1.sinks.sink1.hdfs.fileType=DataStream  
  24. tier1.sinks.sink1.hdfs.writeFormat=Text  
  25. tier1.sinks.sink1.hdfs.rollInterval=0  
  26. tier1.sinks.sink1.hdfs.rollSize=10240  
  27. tier1.sinks.sink1.hdfs.rollCount=0  
  28. tier1.sinks.sink1.hdfs.idleTimeout=60  
复制代码



我们对source1添加了两个拦截器i1和i2,i1为regex_filter,过滤的正则为\\{.*\\},注意正则的写法用到了转义字符,不然source1无法启动,会报错。
i2为timestamp,在header中添加了一个timestamp的key,然后我们修改了sink1.hdfs.path在后面加上了/%y-%m-%d这一串字符,这一串字符要求event的header中必须有timestamp这个key,这就是为什么我们需要添加一个timestamp拦截器的原因,如果不添加这个拦截器,无法使用这样的占位符,会报错。还有很多占位符,请参考官方文档。

然后运行WriteLog,去hdfs上查看对应目录下面的文件,会发现内容只有JSON字符串的日志,与我们的功能描述一致。

相关文章:

flume学习(一):log4j直接输出日志到flume

flume学习(二):如何找到cm安装的flume的配置文件

flume学习(四):Flume Channel Selectors使用





已有(6)人评论

跳转到指定楼层
chinaboy2005 发表于 2014-11-28 22:03:58
东西挺好的,学习中
回复

使用道具 举报

eclipsesky 发表于 2014-12-5 14:37:59
好文章 赞楼主
回复

使用道具 举报

chinaboy2005 发表于 2014-12-5 16:55:38

东西挺好的,学习中
回复

使用道具 举报

cdb521007 发表于 2015-7-26 15:20:20
拦截器是个好东西,但是得自己用的好的前提下,呵呵呵
回复

使用道具 举报

为了明天time 发表于 2015-9-22 10:54:46
好文章 解决了疑惑
回复

使用道具 举报

louiscool2 发表于 2015-10-23 11:15:37
这里用的timestamp 拦截器,写到hdfs目录%y-%m-%d,如果我想用实际数据里的时间,来决定哪条数据该写到哪个时间目录下,该怎么做呢?
tier1.sources.source1.interceptors.i2.type=timestamp
tier1.sinks.sink1.hdfs.path=hdfs://master68:8020/flume/events/%y-%m-%d  
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条