本帖最后由 poppowerlb2 于 2015-6-3 00:02 编辑
问题导读
1.如何发送日志消息给Kafka?
2.怎样进行日志拓扑分析?
3.如何使用Trident实现指数加权移动平均?
4.怎样使用storm和XMPP协议发送警报和通知?
上接:http://www.aboutyun.com/forum.ph ... e=1&extra=#pid94943
最后的拓扑
我们现在已经有所有必要的组件来构建我们的日志分析拓扑如下: [mw_shl_code=java,true]public class LogAnalysisTopology {
public static StormTopologybuildTopology(){
TridentTopologytopology = newTridentTopology();
KafkaConfig.StaticHosts kafkaHosts=KafkaConfig.StaticHosts.fromHostString(
Arrays.asList(newString[]{"testserver"}), 1);
TridentKafkaConfigspoutConf = newTridentKafkaConfig(kafkaHosts, "log-analysis");
//spoutConf.scheme= newStringScheme();
spoutConf.scheme =newSchemeAsMultiScheme(new StringScheme());
spoutConf.forceStartOffsetTime(-1);
OpaqueTridentKafkaSpout spout = newOpaqueTridentKafkaSpout(spoutConf);
Stream spoutStream=topology.newStream("kafka-stream", spout);
Fields jsonFields =newFields("level","timestamp","message","logger");
Stream parsedStream=spoutStream.each(new
Fields("str"),newJsonProjectFunction(jsonFields), jsonFields);
// drop theunparsed JSON to reducetuple size
parsedStream=parsedStream.project(jsonFields);
EWMA ewma = newEWMA().sliding(1.0,
EWMA.Time.MINUTES).withAlpha(EWMA.ONE_MINUTE_ALPHA);
StreamaverageStream =parsedStream.each(new Fields("timestamp"),
new MovingAverageFunction(ewma,
EWMA.Time.MINUTES),newFields("average"));
ThresholdFilterFunction tff = newThresholdFilterFunction(50D);
StreamthresholdStream =averageStream.each(new Fields("average"), tff,
new Fields("change","threshold"));
StreamfilteredStream =
thresholdStream.each(newFields("change"), newBooleanFilter());
filteredStream.each(filteredStream.getOutputFields(),
new XMPPFunction(newNotifyMessageMapper()), new Fields());
returntopology.build();
}
public static void main(String[]args)throws
Exception {
Config conf = newConfig();
conf.put(XMPPFunction.XMPP_USER,"storm@budreau.local");
conf.put(XMPPFunction.XMPP_PASSWORD,"storm");
conf.put(XMPPFunction.XMPP_SERVER,"budreau.local");
conf.put(XMPPFunction.XMPP_TO,"tgoetz@budreau.local");
conf.setMaxSpoutPending(5);
if (args.length ==0) {
LocalClustercluster = new LocalCluster();
cluster.submitTopology("log-analysis", conf, buildTopology());
} else {
conf.setNumWorkers(3);
StormSubmitter.submitTopology(args[0],
conf, buildTopology());
}
}
}[/mw_shl_code] 然后,buildTopology()方法创建所有kafka spout和trident之间的流连接功能和过滤器。 main()方法然后提交拓扑到一个集群:如果是运行在本地模式就是本地集群或远程集群运行时模式就是 分布式模式。 我们开始通过配置kafka spout来读取我们来自应用程序配置写日志事件的同一个话题。因为kafka持久化所有接受的消息,因为我们的应用程序可能已经跑了一段时间(因此记录了许多事件),我们告诉spout快进到kafka的结束队列通过调用forceStartOffsetTime()方法的值为1。这将避免所有的旧消息的重播,我们可能不感兴趣。使用的值2将迫使spout回退队列,并使用一个特定的日期以毫秒为单位将迫使它回放到特定的时间点。如果forceFromStartTime()方法不调用,spout将尝试恢复,最后离开zookeeper通过查找一个偏移量。 接下来,我们设置JsonProjectFunction类来解析从kafka收到的原始JSON并释放出我们感兴趣的值。回想一下,trident的功能是附加的。这意味着我们的元组流,除了所有JSON,提取的值也会包含原始的未解析JSON字符串。因为我们不再需要这些数据,我们调用Stream.project()方法截取我们想要的字段列表。project()方法用于减少元组流,保留只是必要的字段,它尤为重要在实现大量数据流时。 最后,我们应用BooleanFilter类来连接产生的流给XMPPFunction类。 拓扑的main()方法简单填充一个XMPPFunction类配置对象所需的属性并提交拓扑。 运行日志分析拓扑
为了运行分析拓扑结构,首先确保本章早些时候提到的Zookeeper,kafka,OpenFire都是启动和运行。然后,运行拓扑的main()方法。 当拓扑激活时,storm XMPP用户连接到XMPP服务器并触发一个事件。如果你登录到XMPP的同一台服务器客户端和storm用户在你的好友列表,您将看到它变得可用。如下截图所示:
接下来,运行RogueApplication类,等待一分钟。您应该会收到即时消息通知,指示阈值被超过,然后紧随其后的是一个指示恢复正常(低于阈值),如下截图所示:
总结
在这一章,我们已经向您介绍了实时分析通过创建一个简单但强大的拓扑结构,可以适应范围广泛的应用程序。我们构建的组件是通用的,可以很容易地重用在其他项目和扩展。最后,我们介绍了一个实际的spout 实现,可用于多种用途。 而实时分析的话题非常广泛,诚然我们本章并不周详,我们鼓励你去探索在本书的其他章节的技术,并考虑他们如何可能被纳入你的分析工具。 在下一章,我们将向您介绍trident的分布式状态机制,构建一个应用程序不断地把storm处理的数据写入到图形数据库。
|