//val kafkaStream1 = KafkaUtils.createStream(ssc, zks, group, topicmaptmp)
println("---------------kafka----in-----------------------")
//统计次数
val result=scala.collection.mutable.HashMap[String,String]()
var times=0
var allTime:Long=0
val events: DStream[JSONObject] = kafkaStream.flatMap(line => {
println("-------------------------------------")
println(line._2)
println("-------------------------------------")
val data = JSONObject.fromObject(line._2)
// infologger.info(data.toString())
Some(data)
})
println()
println("---------------count times start------------------")
events.foreachRDD(x=>{
x.foreachPartition(data=>{
data.foreach(wtf=>{
//统计次数
println("---------------count times start222222)))))))------------------")
val mac = wtf.getString("MAC")
val time = wtf.getString("now_time")
if(result.contains("MAC")){
val lastTime = result.get("MAC").get
allTime+=new Date(time).getTime- new Date(lastTime).getTime
val hours=(new Date(time).getTime- new Date(lastTime).getTime)/1000/60
if(hours>2){
times+=1
result+=("MAC"->time)
}else{
result+=("MAC"->time)
}
}else{
result+=("MAC"->time)
}
})
})
})
println("times:----------------------"+times)
println("times:----------------------"+allTime)
println("---------------count times end------------------")
|
|