问题导读
1、学习将日志文件打印到local,需要准备什么?
2、一个正式环境系统的系统设计包括什么?
3、怎样使用storm将日志文件打印到local?
阅读前提:
1 : 您可能需要对 logback 日志系统有所了解
2 :您可能需要对于 kafka 有初步的了解
3:请代码查看之前,请您仔细参考系统的业务图解
由于kafka本身自带了和『Hadoop』的接口,如果需要将kafka中的文件直接迁移到HDFS,请参看本ID的另外一篇博文:
业务系统-kafka-Storm【日志本地化】 - 2 :直接通过kafka将日志传递到HDFS
1: 一个正式环境系统的系统设计图解:
通过kafka集群,在2个相同的topic之下,通过kafka-storm, he kafka-hadoop,2 个Consumer,针对同样的一份数据,我们分流了2个管道:
其一: 实时通道
其二:离线通道
在日志本地化的过程之中,前期,由于日志的清洗,过滤的工作是放在Storm集群之中,也就是说,留存到本地locla的日志。是我们在Storm集群之中进行了清洗的数据。
也就是:
如下图所示:
在kafka之中,通常而言,有如下的 代码 用来处理:
在这里我们针对了2种日志,有两个Consumer用来处理
- package com.mixbox.kafka.consumer;
-
- public class logSave {
-
- public static void main(String[] args) throws Exception {
-
- Consumer_Thread visitlog = new Consumer_Thread(KafkaProperties.visit);
- visitlog.start();
-
- Consumer_Thread orderlog = new Consumer_Thread(KafkaProperties.order);
- orderlog.start();
-
- }
- }
复制代码
在这里,我们依据不同的原始字段,将不同的数据保存到不同的文件之中。
- package com.mixbox.kafka.consumer;
-
- import java.io.UnsupportedEncodingException;
- import java.util.HashMap;
- import java.util.List;
- import java.util.Map;
- import java.util.Properties;
-
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
-
- import kafka.consumer.ConsumerConfig;
- import kafka.consumer.ConsumerIterator;
- import kafka.consumer.KafkaStream;
- import kafka.javaapi.consumer.ConsumerConnector;
- import kafka.message.MessageAndMetadata;
-
- /**
- * @author Yin Shuai
- */
- public class Consumer_Thread extends Thread {
-
- // 在事实上我们会依据传递的topic名称,来生成不桐的记录机器
- // private Logger _log_order = LoggerFactory.getLogger("order");
- // private Logger _log_visit = LoggerFactory.getLogger("visit");
-
- private Logger _log = null;
-
- private final ConsumerConnector _consumer;
- private final String _topic;
-
- public Consumer_Thread(String topic) {
-
- _consumer = kafka.consumer.Consumer
- .createJavaConsumerConnector(createConsumerConfig());
- this._topic = topic;
-
- _log = LoggerFactory.getLogger(_topic);
-
- System.err.println("log的名称" + _topic);
-
- }
-
- private static ConsumerConfig createConsumerConfig() {
- Properties props = new Properties();
- props.put("zookeeper.connect", KafkaProperties.zkConnect);
- // 在这里我们的组ID为logSave
- props.put("group.id", KafkaProperties.logSave);
- props.put("zookeeper.session.timeout.ms", "100000");
- props.put("zookeeper.sync.time.ms", "200");
- props.put("auto.commit.interval.ms", "1000");
- return new ConsumerConfig(props);
-
- }
-
- public void run() {
-
- Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
- topicCountMap.put(_topic, new Integer(1));
-
- Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = _consumer
- .createMessageStreams(topicCountMap);
-
- for (KafkaStream<byte[], byte[]> kafkaStream : consumerMap.get(_topic)) {
- ConsumerIterator<byte[], byte[]> iterator = kafkaStream.iterator();
- while (iterator.hasNext()) {
- MessageAndMetadata<byte[], byte[]> next = iterator.next();
- try {
-
- // 在这里我们分拆了一个Consumer 来处理visit日志
- logFile(next);
- System.out.println("message:"
- + new String(next.message(), "utf-8"));
- } catch (UnsupportedEncodingException e) {
- e.printStackTrace();
- }
- }
- }
- }
-
- private void logFile(MessageAndMetadata<byte[], byte[]> next)
- throws UnsupportedEncodingException {
- _log.info(new String(next.message(), "utf-8"));
- }
-
- }
复制代码
一个简单的小tips:
logback.xml ,提醒您注意,这里的配置文件太过粗浅。如有需要,请自行填充。
复制代码
|