本帖最后由 PeersLee 于 2017-5-11 12:28 编辑
问题导读:
1. 如何启动集群?
2. 工程结构是什么样?
3. 处理结果如何储存?
解决方案:
1. 集群启动
1.1 spark
1.2 zookeeper
1.3 kafka
1.4 flume
- flume-ng agent --conf-file ~/opt/flume/conf/single_agent.conf --name a1 -Dflume.root.logger=INFO,console
复制代码
2. 工程结构
2.1 delivery
传入一个StreamingContext 和 topics String,返回一个DStream
- class KafkaMsg {
- def Processing(ssc: StreamingContext, topics : String, groupID : String): DStream[String] ={
- // kafka集群中的一台或多台服务器统称为broker
- val brokers = "localhost:9092,localhost:9093,localhost:9094"
- // kafka
- val topicsSet = topics.split(",").toSet
- // 参数
- val kafkaParams = Map[String, String](
- "metadata.broker.list" -> brokers,
- "group.id" -> groupID,
- // 说明每次程序启动,从kafka中最开始的第一条消息开始读取
- "auto.offset.reset" -> OffsetRequest.SmallestTimeString
- )
- // 返回一个DStream
- KafkaUtils
- .createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)
- .map(_._2)
- }
复制代码
2.2 mongo utils
传入,schema String用来定义mongodb 中字段的 key,rowRdd 对应key, collection是对应集合名字
- def put(schemaStr : String, rowRdd : RDD[Row], collection : String): Unit = {
- val sqlContext = SQLContext.getOrCreate(sc)
- val schema = StructType(schemaStr.split(",")
- .map(column => StructField(column, StringType, true)))
- val df = sqlContext.createDataFrame(rowRdd, schema)
- MongoSpark.save(df.write.option("collection", collection).mode("append"))
- }
复制代码
2.3 processing
2.3.1 Batch
从本地读取数据, 利用Rdd最基本的Api实现业务,然后封装Row传入MongoUtils入库
- def IpLocation(sc : SparkContext, mongoSQL : MongoSQL) : Unit = {
- val ipRuelsRdd = sc.textFile("/home/peerslee/spark_data/ip.txt").map(line =>
- // map RDD 的Transformation 操作,用 f 处理一个Rdd 的所有元素,将结果输出到另一个Rdd
- {
- val fields = line.trim().split("\t")
- val start_num = ip2num(fields(0).trim())
- val end_num = ip2num(fields(1).trim())
- val province = fields(2).trim()
- (start_num, end_num, province)
- })
- // 将Rdd 转成Scala数组,并返回
- val ipRulesArray = ipRuelsRdd.collect()
- // 广播变量:保持一个缓存在每台机器上的只读变量
- val ipRulesBroadcast= sc.broadcast(ipRulesArray)
- // ip
- val ipPat = new Regex("((\\d{1,3}\\.){3}\\d{1,3})")
- // ip -> 访问次数 -> 本地数据
- val result = sc.textFile("/home/peerslee/spark_data/ex17032606.log")
- .map(line => { ipPat.findFirstIn(line.toString()).mkString("")})
- .map(ip => {
- var info : Any = None
- if(!ip.isEmpty) {
- val ipNum = ip2num(ip)
- val index = binarySearch(ipRulesBroadcast.value, ipNum)
- info = ipRulesBroadcast.value(index)
- }
- (info, 1L)})
- .reduceByKey(_+_)
- // total
- val total = result.reduce((x, y) => ("t", x._2+y._2))
- // 次数
- val rowRdd = result.map(x => {
- val r = x._2.toFloat/total._2
- (x._1, r)
- }).map(line => Row(line._1.toString, line._2.toString))
- val schemaStr = "loc,rate"
- mongoSQL.put(schemaStr, rowRdd, "IP_rate_2")
- }
复制代码
- def Top50(sc : SparkContext, mongoSQL: MongoSQL) : Unit = {
- val pat = "\\shttp://www\\.aboutyun\\.com/thread.*?\\.html.*?".r
- val rddArr = sc.textFile("/home/peerslee/spark_data/ex17032606.log")
- .map(lines => pat.findFirstIn(lines.toString()).mkString("").trim()).filter(!_.isEmpty)
- .map(lines => (lines, 1L)).reduceByKey(_ + _).map(e => (e._2, e._1)).sortByKey(false) take 50
- val rowRdd = sc.parallelize(rddArr).map(line => Row(line._1.toString, line._2.toString))
- val schemaStr = "num,url"
- mongoSQL.put(schemaStr, rowRdd, "Top50_3")
- }
复制代码
- def ModulePV(sc : SparkContext, sqlContext : SQLContext, MongoSQL : MongoSQL) : Unit = {
- val df = sqlContext.read.json("/home/peerslee/spark_data/boutyun_plate_id.json").select(
- "id", "name", "rank"
- )
- val plateIdRuels = df.collect()
- val plateIdBroadcast = sc.broadcast(plateIdRuels)
- val idPat = new Regex("fid=\\d+")
- // 各个模块访问次数
- val rowRdd = sc.textFile("/home/peerslee/spark_data/ex17032606.log").map(lines =>
- idPat.findFirstIn(lines.toString()).mkString("").replace("fid=","")).filter(!_.isEmpty).map(id => {
- var res : Any = None
- plateIdBroadcast.value.foreach(bc => {
- if(bc(0).equals(id)) {
- res = bc
- }
- })
- (res, 1L)
- }).reduceByKey(_+_).map(e => (e._2, e._1)).sortByKey().filter(_._2 != None)
- .map(line => Row(line._1.toString, line._2.toString))
- val schemaStr = "num,type"
- MongoSQL.put(schemaStr, rowRdd, "module_pv_4")
- }
复制代码
- def SearchUseNum(sc : SparkContext, MongoSQL : MongoSQL) : Unit = {
- val blogPat = ".*?\\shttp://www\\.aboutyun\\.com/search\\.php\\?mod=blog\\s.*?".r
- val forumPat = ".*?\\shttp://www\\.aboutyun\\.com/search\\.php\\?mod=forum\\s.*?".r
- // 各个模块访问次数
- val rowRdd = sc.textFile("/home/peerslee/spark_data/ex17032606.log").map(lines => {
- var res : Any = None
- lines match {
- case blogPat() => res = "blog"
- case forumPat() => res = "forum"
- case _ => res = "no"
- }
- res
- }
- ).filter(!_.equals("no")).map((_, 1L)).reduceByKey(_+_)
- .map(line => Row(line._1.toString, line._2.toString))
- val schemaStr = "search,num"
- MongoSQL.put(schemaStr, rowRdd, "search_use_num_5")
- }
复制代码
- def Navi(sc : SparkContext, MongoSQL : MongoSQL) : Unit = {
- // 读取log
- val ddPat = ".*?\\shttp://www\\.aboutyun\\.com/forum\\.php\\?mod=guide\\s.*?".r
- val bkPat = ".*?\\shttp://www\\.aboutyun\\.com/home\\.php\\?mod=space&do=blog\\s.*?".r
- val ztPat = ".*?\\shttp://www\\.aboutyun\\.com/forum\\.php\\?mod=collection\\s.*?".r
- val llPat = ".*?\\shttp://www\\.aboutyun\\.com/home\\.php\\s.*?".r
- val ydPat = ".*?\\shttp://www\\.aboutyun\\.com/home\\.php\\?mod=space&do=share\\s.*?".r
- // 各个模块访问次数
- val rowRdd = sc.textFile("/home/peerslee/spark_data/ex17032606.log").map(lines => {
- var res : Any = None
- lines match {
- case ddPat() => res = "guide"
- case bkPat() => res = "blog"
- case ztPat() => res = "collection"
- case llPat() => res = "home"
- case ydPat() => res = "space"
- case _ => res = "no"
- }
- res
- }
- ).filter(!_.equals("no"))
- .map((_, 1L)).reduceByKey(_+_)
- .map(line => Row(line._1.toString, line._2.toString))
- val schemaStr = "type,num"
- MongoSQL.put(schemaStr, rowRdd, "navi_6")
- }
复制代码
2.3.2 Stream
从KafkaMsg中读取DStream,实现业务,foreachRdd进行封住,入库
- 统计访问量,pv,每个人访问该网站的数量,[uv,一个网站被多少人访问]
- def PV(dStream : DStream[String], mongoSQL: MongoSQL) : Unit = {
- dStream.map((line: String) => (ipPat.findFirstIn(line).mkString(""), 1L))
- .reduceByKey(_ + _).foreachRDD((rdd : RDD[(String, Long)]) => {
- val rowRdd = rdd.map(r => Row(r._1.toString, r._2.toString))
- val schemaStr = "ip,num"
- mongoSQL.put(schemaStr, rowRdd, "pv_1")
- println("1...ok")
- })
- }
复制代码
- def DirectOrInDirect(dStream : DStream[String], mongoSQL: MongoSQL) : Unit = {
- dStream.map(line => {
- val url = urlPat.findFirstIn(line)
- var key = "direct"
- if(!url.isEmpty) {
- key = "inDirect"
- }
- (key, 1L)
- }).reduceByKey(_+_).foreachRDD((rdd : RDD[(String, Long)]) => {
- val rowRdd = rdd.map(r => Row(r._1.toString, r._2.toString))
- val schemaStr = "type,num"
- mongoSQL.put(schemaStr, rowRdd, "direct_or_indirect_7")
- println("2...ok")
- })
- }
复制代码
- def DangerIP(rdd : DStream[String],sc : SparkContext, mongoSQL: MongoSQL) : Unit = {
- rdd.filter(_ != "").map(line => {
- (ipPat.findFirstIn(line).toString, 1L)
- }).reduceByKey(_+_).map(i => (i._2, i._1.toString())).foreachRDD{(rdd : RDD[(Long, String)]) => {
- val sort = rdd.sortByKey(false) take 10 // array
- val schemaStr = "num,ip"
- val rowRdd = sc.parallelize(sort).map(r => Row(r._1.toString, r._2.toString))
- mongoSQL.put(schemaStr, rowRdd, "danger_IP_8")
- println("3...ok")
- }
- }
- }
复制代码
- def IPLocation(dStream: DStream[String], sc : SparkContext, mongoSQL: MongoSQL): Unit = {
- val ipRulesBroadcast = IPRuels.getInstance(sc)
- val result = dStream.map(line => { ipPat.findFirstIn(line.toString()).mkString("")})
- .map(ip => {
- var info : Any = None
- if(!ip.isEmpty) {
- val ipNum = ip2num(ip)
- val index = binarySearch(ipRulesBroadcast.value, ipNum)
- info = ipRulesBroadcast.value(index)
- }
- (info, 1L)})
- .reduceByKey(_+_)
- result.foreachRDD((rdd : RDD[(Any, Long)]) => {
- // 将total 写在map,for里
- var total = 1L
- try {
- total = rdd.reduce((x, y) => ("t", x._2 + y._2))._2
- } catch {
- case ex : UnsupportedOperationException => {
- total = 1
- }
- }
- val rowRdd = rdd.map(x => {
- val r = x._2.toFloat/total
- (x._1, r)
- }).map(line => Row(line._1.toString, line._2.toString))
- val schemaStr = "loc,rate"
- mongoSQL.put(schemaStr, rowRdd, "IP_rate_2")
- })
- }
复制代码
- def Top50(dStream: DStream[String], sc : SparkContext, mongoSQL: MongoSQL) : Unit = {
- val pat = "\\shttp://www\\.aboutyun\\.com/thread.*?\\.html.*?".r
- dStream.foreachRDD(rdd => {
- val rddArr = rdd.map(lines => pat.findFirstIn(lines.toString()).mkString("").trim()).filter(!_.isEmpty)
- .map(lines => (lines, 1L)).reduceByKey(_ + _).map(e => (e._2, e._1)).sortByKey(false) take 50
- val rowRdd = sc.parallelize(rddArr).map(line => Row(line._1.toString, line._2.toString))
- val schemaStr = "num,url"
- mongoSQL.put(schemaStr, rowRdd, "Top50_3")
- })
- }
复制代码
- def ModulePV(dStream: DStream[String], sqlContext : SQLContext, MongoSQL : MongoSQL) : Unit = {
- val plateIdBroadcast = PlateIDRuels.getInstance(sqlContext)
- val idPat = new Regex("fid=\\d+")
- // 各个模块访问次数
- dStream
- .map(lines =>
- idPat.findFirstIn(lines.toString()).mkString("").replace("fid=","")).filter(!_.isEmpty).map(id => {
- var res : Any = None
- plateIdBroadcast.value.foreach(bc => {
- if(bc(0).equals(id)) {
- res = bc
- }
- })
- (res, 1L)
- }).reduceByKey(_+_).map(e => (e._2, e._1)).foreachRDD(rdd => {
- val rowRdd = rdd.sortByKey().filter(_._2 != None)
- .map(line => Row(line._1.toString, line._2.toString))
- val schemaStr = "num,type"
- MongoSQL.put(schemaStr, rowRdd, "module_pv_4")
- })
- }
复制代码
- def SearchUseNum(dStream: DStream[String], MongoSQL : MongoSQL) : Unit = {
- // 各个模块访问次数
- dStream.foreachRDD(rdd => {
- // rdd.collect().foreach(println)
- val Rdd = rdd.map(lines => {
- val blogPat = ".*?\\shttp://www\\.aboutyun\\.com/search\\.php\\?mod=blog\\s.*?".r
- val forumPat = ".*?\\shttp://www\\.aboutyun\\.com/search\\.php\\?mod=forum\\s.*?".r
- var res: Any = None
- lines.toString match {
- case blogPat() => res = "blog"
- case forumPat() => res = "forum"
- case _ => res = "no"
- }
- res
- }
- ).filter(!_.equals("no"))
- val rowRdd = Rdd.map((_, 1L)).reduceByKey(_ + _)
- .map(line => Row(line._1.toString, line._2.toString))
- val schemaStr = "search,num"
- MongoSQL.put(schemaStr, rowRdd, "search_use_num_5")
- }
- )
- }
复制代码
- def Navi(dStream: DStream[String], MongoSQL : MongoSQL) : Unit = {
- // 读取log
- val ddPat = ".*?\\shttp://www\\.aboutyun\\.com/forum\\.php\\?mod=guide\\s.*?".r
- val bkPat = ".*?\\shttp://www\\.aboutyun\\.com/home\\.php\\?mod=space&do=blog\\s.*?".r
- val ztPat = ".*?\\shttp://www\\.aboutyun\\.com/forum\\.php\\?mod=collection\\s.*?".r
- val llPat = ".*?\\shttp://www\\.aboutyun\\.com/home\\.php\\s.*?".r
- val ydPat = ".*?\\shttp://www\\.aboutyun\\.com/home\\.php\\?mod=space&do=share\\s.*?".r
- // 各个模块访问次数
- dStream.map(lines => {
- var res : Any = None
- lines match {
- case ddPat() => res = "guide"
- case bkPat() => res = "blog"
- case ztPat() => res = "collection"
- case llPat() => res = "home"
- case ydPat() => res = "space"
- case _ => res = "no"
- }
- res
- }
- ).filter(!_.equals("no"))
- .map((_, 1L)).reduceByKey(_+_)
- .foreachRDD(rdd => {
- println(rdd.collect().toBuffer)
- val rowRdd = rdd.map(line => Row(line._1.toString, line._2.toString))
- val schemaStr = "type,num"
- MongoSQL.put(schemaStr, rowRdd, "navi_6")
- })
- }
复制代码
3. 结果展示
3.1 Top50
3.2 IP_rate
完整代码下载: 地址
链接: https://pan.baidu.com/s/1c1BMjbq 密码: 7d6a
|
|