分享

业务系统-kafka-Storm-将日志文件打印到local

问题导读
1、学习将日志文件打印到local,需要准备什么?
2、一个正式环境系统的系统设计包括什么?
3、怎样使用storm将日志文件打印到local?



阅读前提:
        1 : 您可能需要对  logback 日志系统有所了解
        2 :您可能需要对于 kafka 有初步的了解
        3:请代码查看之前,请您仔细参考系统的业务图解


    由于kafka本身自带了和『Hadoop』的接口,如果需要将kafka中的文件直接迁移到HDFS,请参看本ID的另外一篇博文:
    业务系统-kafka-Storm【日志本地化】 - 2 :直接通过kafka将日志传递到HDFS

    1: 一个正式环境系统的系统设计图解:
1.png


             通过kafka集群,在2个相同的topic之下,通过kafka-storm, he kafka-hadoop,2 个Consumer,针对同样的一份数据,我们分流了2个管道:
            其一: 实时通道
            其二:离线通道

       在日志本地化的过程之中,前期,由于日志的清洗,过滤的工作是放在Storm集群之中,也就是说,留存到本地locla的日志。是我们在Storm集群之中进行了清洗的数据。

      也就是:

            如下图所示:
1.png


      在kafka之中,通常而言,有如下的 代码 用来处理:
         在这里我们针对了2种日志,有两个Consumer用来处理

  1. package com.mixbox.kafka.consumer;
  2. public class logSave {
  3.     public static void main(String[] args) throws Exception {
  4.         Consumer_Thread visitlog = new Consumer_Thread(KafkaProperties.visit);
  5.         visitlog.start();
  6.         Consumer_Thread orderlog = new Consumer_Thread(KafkaProperties.order);
  7.         orderlog.start();
  8.     }
  9. }
复制代码

     在这里,我们依据不同的原始字段,将不同的数据保存到不同的文件之中。

  1. package com.mixbox.kafka.consumer;
  2. import java.io.UnsupportedEncodingException;
  3. import java.util.HashMap;
  4. import java.util.List;
  5. import java.util.Map;
  6. import java.util.Properties;
  7. import org.slf4j.Logger;
  8. import org.slf4j.LoggerFactory;
  9. import kafka.consumer.ConsumerConfig;
  10. import kafka.consumer.ConsumerIterator;
  11. import kafka.consumer.KafkaStream;
  12. import kafka.javaapi.consumer.ConsumerConnector;
  13. import kafka.message.MessageAndMetadata;
  14. /**
  15. * @author Yin Shuai
  16. */
  17. public class Consumer_Thread extends Thread {
  18.     // 在事实上我们会依据传递的topic名称,来生成不桐的记录机器
  19.     // private Logger _log_order = LoggerFactory.getLogger("order");
  20.     // private Logger _log_visit = LoggerFactory.getLogger("visit");
  21.     private Logger _log = null;
  22.     private final ConsumerConnector _consumer;
  23.     private final String _topic;
  24.     public Consumer_Thread(String topic) {
  25.         _consumer = kafka.consumer.Consumer
  26.                 .createJavaConsumerConnector(createConsumerConfig());
  27.         this._topic = topic;
  28.         _log = LoggerFactory.getLogger(_topic);
  29.         System.err.println("log的名称" + _topic);
  30.     }
  31.     private static ConsumerConfig createConsumerConfig() {
  32.         Properties props = new Properties();
  33.         props.put("zookeeper.connect", KafkaProperties.zkConnect);
  34.         // 在这里我们的组ID为logSave
  35.         props.put("group.id", KafkaProperties.logSave);
  36.         props.put("zookeeper.session.timeout.ms", "100000");
  37.         props.put("zookeeper.sync.time.ms", "200");
  38.         props.put("auto.commit.interval.ms", "1000");
  39.         return new ConsumerConfig(props);
  40.     }
  41.     public void run() {
  42.         Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
  43.         topicCountMap.put(_topic, new Integer(1));
  44.         Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = _consumer
  45.                 .createMessageStreams(topicCountMap);
  46.         for (KafkaStream<byte[], byte[]> kafkaStream : consumerMap.get(_topic)) {
  47.             ConsumerIterator<byte[], byte[]> iterator = kafkaStream.iterator();
  48.             while (iterator.hasNext()) {
  49.                 MessageAndMetadata<byte[], byte[]> next = iterator.next();
  50.                 try {
  51.                     // 在这里我们分拆了一个Consumer 来处理visit日志
  52.                     logFile(next);
  53.                     System.out.println("message:"
  54.                             + new String(next.message(), "utf-8"));
  55.                 } catch (UnsupportedEncodingException e) {
  56.                     e.printStackTrace();
  57.                 }
  58.             }
  59.         }
  60.     }
  61.     private void logFile(MessageAndMetadata<byte[], byte[]> next)
  62.             throws UnsupportedEncodingException {
  63.         _log.info(new String(next.message(), "utf-8"));
  64.     }
  65. }
复制代码


    一个简单的小tips:
        logback.xml  ,提醒您注意,这里的配置文件太过粗浅。如有需要,请自行填充。
  1. <?xml version="1.0" encoding="UTF-8" ?>
  2. <configuration>
  3.     <jmxConfigurator />
  4.     <!-- 控制台输出日志 -->
  5.     <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
  6.         <!-- 过滤掉 TRACE 和 DEBUG 级别的日志 -->
  7.         <!-- <filter class="ch.qos.logback.classic.filter.ThresholdFilter"> -->
  8.         <!-- <level>INFO</level> -->
  9.         <!-- </filter> -->
  10.         <!-- 按天来回滚,如果需要按小时来回滚,则设置为{yyyy-MM-dd_HH} -->
  11.         <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
  12.             <fileNamePattern>f:/opt/log/test.%d{yyyy-MM-dd}.log</fileNamePattern>
  13.             <!-- 如果按天来回滚,则最大保存时间为1天,1天之前的都将被清理掉 -->
  14.         </rollingPolicy>
  15.         <!-- 日志输出格式 -->
  16.         <layout class="ch.qos.logback.classic.PatternLayout">
  17.             <pattern>
  18.                 %d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level
  19.                 %logger{36}-%msg%n</pattern>
  20.         </layout>
  21.     </appender>
  22.     <!-- 记录到日志 文件的滚动日志 -->
  23.     <appender name="ERROR"
  24.         class="ch.qos.logback.core.rolling.RollingFileAppender">
  25.         <file>
  26.             e:/logs/error/error.log
  27.         </file>
  28.         <filter class="ch.qos.logback.classic.filter.LevelFilter">
  29.             <level>
  30.                 ERROR
  31.             </level>
  32.             <onMatch>ACCEPT</onMatch>
  33.             <onMismatch>DENY</onMismatch>
  34.         </filter>
  35.         <!-- 定义每天生成一个日志文件 -->
  36.         <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
  37.             <fileNamePattern>e:/logs/yuanshi-%d{yyyy-MM-dd}.log</fileNamePattern>
  38.             <MaxHistory>10</MaxHistory>
  39.         </rollingPolicy>
  40.         <!-- 日志样式 -->
  41.         <layout class="ch.qos.logback.classic.PatternLayout">
  42.             <pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level
  43.                 %logger{36}-%msg%n</pattern>
  44.         </layout>
  45.     </appender>
  46.     <!-- 记录到日志 文件的滚动日志 -->
  47.     <appender name="FILE"
  48.         class="ch.qos.logback.core.rolling.RollingFileAppender">
  49.         <file>E:\logs\file\file.log</file>
  50.         <filter class="ch.qos.logback.classic.filter.LevelFilter">
  51.             <level>INFO</level>
  52.             <onMatch>ACCEPT</onMatch>
  53.             <onMismatch>DENY</onMismatch>
  54.         </filter>
  55.         <!-- 定义每天生成一个日志文件 -->
  56.         <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
  57.             <fileNamePattern>e:/logs/venality-%d{yyyy-MM-dd}.log
  58.             </fileNamePattern>
  59.             <MaxHistory>10</MaxHistory>
  60.         </rollingPolicy>
  61.         <!-- 日志样式 -->
  62.         <layout class="ch.qos.logback.classic.PatternLayout">
  63.             <pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level
  64.                 %logger{36}-%msg%n</pattern>
  65.         </layout>
  66.     </appender>
  67.     <appender name="visit"
  68.     class="ch.qos.logback.core.rolling.RollingFileAppender">
  69.         <File>
  70.             E:\logs\visitlog\visit.log
  71.         </File>
  72.         <encoder>
  73.             <pattern>%msg%n</pattern>
  74.         </encoder>
  75.         <filter class="ch.qos.logback.classic.filter.ThresholdFilter">
  76.             <level>INFO</level>
  77.         </filter>
  78.         <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
  79.             <fileNamePattern>E:\logs\visit.log.%d{yyyy-MM-dd}
  80.             </fileNamePattern>
  81.         </rollingPolicy>
  82.     </appender>
  83.     <logger name="visit" additivity="false" level="INFO">
  84.         <appender-ref ref="visit" />
  85.     </logger>
  86.     <appender name="order"
  87.         class="ch.qos.logback.core.rolling.RollingFileAppender">
  88.         <File>
  89.             E:\logs\orderlog\order.log
  90.         </File>
  91.         <encoder>
  92.             <pattern>%msg%n
  93.             </pattern>
  94.         </encoder>
  95.         <filter class="ch.qos.logback.classic.filter.ThresholdFilter">
  96.             <level>INFO</level>
  97.         </filter>
  98.         <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
  99.             <fileNamePattern>E:\logs\order.log.%d{yyyy-MM-dd}
  100.             </fileNamePattern>
  101.         </rollingPolicy>
  102.     </appender>
  103.     <logger name="order" additivity="false" level="INFO">
  104.         <appender-ref ref="order" />
  105.     </logger>
  106.     <root level="DEBUG">
  107.         <appender-ref ref="FILE" />
  108.     </root>
  109. </configuration>
复制代码





已有(1)人评论

跳转到指定楼层
GreenArrow 发表于 2014-9-24 21:31:34
学习学习啦
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条