本帖最后由 Tank_2000 于 2016-12-11 22:10 编辑
[mw_shl_code=java,true]topology.newDRPCStream("top", drpc).each(new Fields("args"), new Split(“ ”), new Fields("time")).parallelismHint(5).stateQuery(myStates,new Fields("time"),new QueryPacketDB(),new Fields("srcip", "byt", "pkt")).groupBy(new Fields("srcip")).chainedAgg().aggregate(new Fields("byt"), new count(), new Fields("yt")).aggregate(new Fields("pkt"), new count(), new Fields("kt")).chainEnd().applyAssembly(new FirstN(10, "yt", true));
[/mw_shl_code]
使用ITridentSpout, 将上面的分组代码,稍微调整按时间分 分组试试 |