poppowerlb2 发表于 2015-6-1 21:21:12

使用Storm和Trident进行实时趋势分析(二)

本帖最后由 poppowerlb2 于 2015-6-3 00:02 编辑

问题导读

1.如何发送日志消息给Kafka?
2.怎样进行日志拓扑分析?
3.如何使用Trident实现指数加权移动平均?
4.怎样使用storm和XMPP协议发送警报和通知?

static/image/hrline/4.gif

上接:http://www.aboutyun.com/forum.ph ... e=1&extra=#pid94943

最后的拓扑
我们现在已经有所有必要的组件来构建我们的日志分析拓扑如下:public class LogAnalysisTopology {
    public static StormTopologybuildTopology(){
      TridentTopologytopology = newTridentTopology();
       KafkaConfig.StaticHosts kafkaHosts=KafkaConfig.StaticHosts.fromHostString(
                      Arrays.asList(newString[]{"testserver"}), 1);
      TridentKafkaConfigspoutConf = newTridentKafkaConfig(kafkaHosts, "log-analysis");
      //spoutConf.scheme= newStringScheme();
      spoutConf.scheme =newSchemeAsMultiScheme(new StringScheme());
       spoutConf.forceStartOffsetTime(-1);
       OpaqueTridentKafkaSpout spout = newOpaqueTridentKafkaSpout(spoutConf);
      Stream spoutStream=topology.newStream("kafka-stream", spout);
      Fields jsonFields =newFields("level","timestamp","message","logger");
      Stream parsedStream=spoutStream.each(new
               Fields("str"),newJsonProjectFunction(jsonFields), jsonFields);
      // drop theunparsed JSON to reducetuple size
      parsedStream=parsedStream.project(jsonFields);
      EWMA ewma = newEWMA().sliding(1.0,
            EWMA.Time.MINUTES).withAlpha(EWMA.ONE_MINUTE_ALPHA);
      StreamaverageStream =parsedStream.each(new Fields("timestamp"),
               new MovingAverageFunction(ewma,
                     EWMA.Time.MINUTES),newFields("average"));
       ThresholdFilterFunction tff = newThresholdFilterFunction(50D);
      StreamthresholdStream =averageStream.each(new Fields("average"), tff,
               new Fields("change","threshold"));
      StreamfilteredStream =
               thresholdStream.each(newFields("change"), newBooleanFilter());
      filteredStream.each(filteredStream.getOutputFields(),
               new XMPPFunction(newNotifyMessageMapper()), new Fields());
      returntopology.build();
    }
    public static void main(String[]args)throws
         Exception {
      Config conf = newConfig();
       conf.put(XMPPFunction.XMPP_USER,"storm@budreau.local");
       conf.put(XMPPFunction.XMPP_PASSWORD,"storm");
       conf.put(XMPPFunction.XMPP_SERVER,"budreau.local");
      conf.put(XMPPFunction.XMPP_TO,"tgoetz@budreau.local");
       conf.setMaxSpoutPending(5);
      if (args.length ==0) {
          LocalClustercluster = new LocalCluster();
         cluster.submitTopology("log-analysis", conf, buildTopology());
      } else {
         conf.setNumWorkers(3);
          StormSubmitter.submitTopology(args,
                   conf, buildTopology());
      }
    }
}然后,buildTopology()方法创建所有kafka spout和trident之间的流连接功能和过滤器。main()方法然后提交拓扑到一个集群:如果是运行在本地模式就是本地集群或远程集群运行时模式就是 分布式模式。我们开始通过配置kafka spout来读取我们来自应用程序配置写日志事件的同一个话题。因为kafka持久化所有接受的消息,因为我们的应用程序可能已经跑了一段时间(因此记录了许多事件),我们告诉spout快进到kafka的结束队列通过调用forceStartOffsetTime()方法的值为1。这将避免所有的旧消息的重播,我们可能不感兴趣。使用的值2将迫使spout回退队列,并使用一个特定的日期以毫秒为单位将迫使它回放到特定的时间点。如果forceFromStartTime()方法不调用,spout将尝试恢复,最后离开zookeeper通过查找一个偏移量。接下来,我们设置JsonProjectFunction类来解析从kafka收到的原始JSON并释放出我们感兴趣的值。回想一下,trident的功能是附加的。这意味着我们的元组流,除了所有JSON,提取的值也会包含原始的未解析JSON字符串。因为我们不再需要这些数据,我们调用Stream.project()方法截取我们想要的字段列表。project()方法用于减少元组流,保留只是必要的字段,它尤为重要在实现大量数据流时。最后,我们应用BooleanFilter类来连接产生的流给XMPPFunction类。拓扑的main()方法简单填充一个XMPPFunction类配置对象所需的属性并提交拓扑。运行日志分析拓扑
为了运行分析拓扑结构,首先确保本章早些时候提到的Zookeeper,kafka,OpenFire都是启动和运行。然后,运行拓扑的main()方法。当拓扑激活时,storm XMPP用户连接到XMPP服务器并触发一个事件。如果你登录到XMPP的同一台服务器客户端和storm用户在你的好友列表,您将看到它变得可用。如下截图所示:
接下来,运行RogueApplication类,等待一分钟。您应该会收到即时消息通知,指示阈值被超过,然后紧随其后的是一个指示恢复正常(低于阈值),如下截图所示:
总结
在这一章,我们已经向您介绍了实时分析通过创建一个简单但强大的拓扑结构,可以适应范围广泛的应用程序。我们构建的组件是通用的,可以很容易地重用在其他项目和扩展。最后,我们介绍了一个实际的spout 实现,可用于多种用途。而实时分析的话题非常广泛,诚然我们本章并不周详,我们鼓励你去探索在本书的其他章节的技术,并考虑他们如何可能被纳入你的分析工具。在下一章,我们将向您介绍trident的分布式状态机制,构建一个应用程序不断地把storm处理的数据写入到图形数据库。

fysoft2006 发表于 2015-6-15 18:20:34

谢谢分享!!!

邓立辉 发表于 2015-10-19 20:59:16

谢谢分享楼主

bbfj 发表于 2016-3-27 09:55:33

不错的,讲解的很详细

ningjianbang 发表于 2016-4-6 17:05:35

谢谢楼主分享,学习了

jmamike 发表于 2017-6-14 11:32:59

不错,有参考价值
页: [1]
查看完整版本: 使用Storm和Trident进行实时趋势分析(二)