TridentTopology tridentTopology = new TridentTopology(); Stream stream = tridentTopology.newStream("event", kafkaSpout).parallelismHint(5);
Stream logStream = stream.each(new Fields("bytes"), new LogExtractorFunction(), new Fields("eventlog")) .filter(new LogFilelter("startGame")) .each(new Fields("eventlog"), new LogGroupFunction(), new Fields("eventTimeStr"));
Options<OpaqueValue> opts = new Options<OpaqueValue>(); opts.dataTypeDescription = new RedisDataTypeDescription(RedisDataTypeDescription.RedisDataType.STRING);
logStream.groupBy(new Fields("eventTimeStr")) .persistentAggregate(RedisMapState.opaque(poolConfig, opts), new Count(), new Fields("count")) .parallelismHint(5);
return tridentTopology.build();
为什么统计的结果不对呢。就是一个简单的事件名称拼上时间,统计发生多少次。每次count结果都不对
|