日志这个东西呢,说重要非常重要,做数据挖掘和分析都全靠它了。说不重要也不重要,毕竟不是用户数据。不管怎么样我们还是希望得到一个可靠的日志收集系统。
Flume本身提供了failover机制,可以自动切换和恢复。在我们的实践中,有多个产生日志的服务器分布在全球不同地方的机房,然后要把所有的日志都收集到一个集中存放的存储中。这里我简化了整个结构做一个例子。
1台game服务器,上面部署agent,这是整个数据源。其他game服务器上的配置类似。
2台collector服务器,主要是为了演示如何做failover
1台logserver服务器,主要是为了演示接收数据
主要配置文件如下(在一台服务器上模拟):
# flume/conf/game.conf
a1.channels = c1
a1.sources = r1
#配置两个以上的sink
a1.sinks = k1 k2
#这个是配置failover到关键,需要有一个sink group
a1.sinkgroups = g1
#仍然使用内存通道,当然你也可以用其他的
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.channels = c1
a1.sources.r1.type = exec
#继续使用文件source,tail命令
a1.sources.r1.command = tail -F /tmp/test.log
#加入两个拦截器,表示其来源
a1.sources.r1.interceptors = i1 i2
#静态拦截器,表示游戏的名称为PWI
a1.sources.r1.interceptors.i1.type = static
a1.sources.r1.interceptors.i1.key = Game
a1.sources.r1.interceptors.i1.value = PWI
#timestamp拦截器
a1.sources.r1.interceptors.i2.type = timestamp
#配置第一个sink
a1.sinks.k1.channel = c1
a1.sinks.k1.type = avro
#连接到第一个collector的地址
a1.sinks.k1.hostname = 127.0.0.1
a1.sinks.k1.port = 59221
#配置第二个sink
#跟第一个sink共享同一个channel,也可以配置另外一个channel
a1.sinks.k2.channel = c1
a1.sinks.k2.type = avro
#连接到第二个collector到地址
a1.sinks.k2.hostname = 127.0.0.1
a1.sinks.k2.port = 59222
#配置sink group
a1.sinkgroups.g1.sinks = k1 k2
#处理的类型是failover
a1.sinkgroups.g1.processor.type = failover
#优先级,数字越大优先级越高,每个sink的优先级必须不相同
a1.sinkgroups.g1.processor.priority.k1 = 10
a1.sinkgroups.g1.processor.priority.k2 = 1
#设置为10秒,当然可以根据你的实际状况更改成更快或者很慢
a1.sinkgroups.g1.processor.maxpenalty = 10000
#flume/conf/collector1.conf
a1.sources = r1
a1.channels = c1
a1.sinks = k1
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
#avro source,监听本地IP和端口
a1.sources.r1.type = avro
a1.sources.r1.bind = 127.0.0.1
a1.sources.r1.port = 59221
#设置一个拦截器,在Header中增加一些信息,表示路径
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = static
a1.sources.r1.interceptors.i1.key = Collector
a1.sources.r1.interceptors.i1.value = EU
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
a1.sinks.k1.type = avro
#设置logserver到地址
a1.sinks.k1.hostname = 127.0.0.1
a1.sinks.k1.port = 59223
#flume/conf/collector2.conf 的修改非常简单,跟1几乎一样,修改了一个IP地址(59222),和拦截器的value(SC),不再赘述。
#flume/conf/logserver.conf
a1.sources = r1
a1.channels = c1
a1.sinks = k1
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.type = avro
a1.sources.r1.bind = 127.0.0.1
a1.sources.r1.port = 59223
#设置一个拦截器,加入路径信息,不设置也没关系
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = static
a1.sources.r1.interceptors.i1.key = LogServer
a1.sources.r1.interceptors.i1.value = s1
a1.sources.r1.channels = c1
#直接输出到日志了,没有输出到文件。
a1.sinks.k1.type = logger
a1.sinks.k1.channel = c1
配置完成,启动。启动到顺序不用考虑,flume自动会根据配置尝试连接。甚至在你启动之后,如果你要修改配置文件也不用重启,flume有自动重载配置文件的功能。
启动命令:bin/flume-ng agent –conf-file conf/game.conf –conf conf/ –name a1 -Dflume.root.logger=INFO,console
这个启动命令到参数都是什么意思呢?请使用 bin/flume-ng –help 查看
所有flume agent启动之后,在logserver端到屏幕上就会出现这样的日志:
2013-04-04 11:41:20,347 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{timestamp=1365103873783, LogServer=s1, Game=PWI, Collector=EU} body: 74 65 73 74 20 6C 6F 67 20 66 6F 72 20 66 6C 75 test log for flu }
如果我把collector1停掉之后,就出现了这样的日志:
2013-04-04 12:11:01,532 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{timestamp=1365102650530, LogServer=s1, Game=PWI, Collector=SC} body: 74 65 73 74 20 6C 6F 67 20 66 6F 72 20 66 6C 75 test log for flu }
请大家注意Event-headers里面的Collector,是不是已经变了?是的,flume发现collector1不能服务的时候自动将信息转到了collector2上面。这个时候我们重新启动collector1,就会发现logserver上面一开始的日志又回来了,说明flume探测到高优先级的sink已经可以使用了,重新把数据发送到了高优先级的collector1。