分享

about云日志分析项目准备11-1:spark streaming+spark sql 实现业务

Oner 发表于 2017-4-23 20:31:03 [显示全部楼层] 只看大图 回帖奖励 阅读模式 关闭右栏 0 3607
接上篇:about云日志分析项目准备11:spark streaming 接收 flume 监控目录的日志文件
这篇主要讲解如何使用spark streaming在本地实现业务逻辑

一、工程结构

整个工程分为以下几个模块
business:业务代码
conf:配置信息
preprocessing:日志预处理过程
util:工具类
Run:启动程序
QQ截图20170423194825.jpg

二、Run

这是一个入口类,包含了spark streaming程序的启动和结果输出相关的程序
  1. import java.text.SimpleDateFormat
  2. import java.util.{Calendar, Date}

  3. import business.StaSQL
  4. import conf.Config
  5. import org.apache.log4j.{Level, Logger}
  6. import org.apache.spark.sql.hive.HiveContext
  7. import org.apache.spark.sql.types.{StringType, StructField, StructType}
  8. import org.apache.spark.streaming.dstream.DStream
  9. import org.apache.spark.streaming.{Seconds, StreamingContext}
  10. import org.apache.spark.{SparkConf, SparkContext}
  11. import preprocessing.PreProcessing
  12. import util.Tools

  13. /**
  14.   * 入口类
  15.   */
  16. object Run {
  17.   def main(args: Array[String]): Unit = {

  18.     Logger.getRootLogger.setLevel(Level.WARN)

  19.     val date = new Date()
  20.     val calendar = Calendar.getInstance()
  21.     calendar.add(Calendar.DATE, -1)
  22.     val total_time = calendar.getTime()
  23.     val formatter = new SimpleDateFormat("yyyyMMdd")
  24.     var dateString = formatter.format(total_time)

  25.     // 创建SparkConf对象,并指定AppName和Master
  26.     val conf = new SparkConf()
  27.       .setAppName("StartProgram")
  28.       .setMaster("local[2]")

  29.     val sc = new SparkContext(conf)
  30.     val sqlContext = new HiveContext(sc)
  31.     // 创建StreamingContext对象
  32.     val ssc = new StreamingContext(sc, Seconds(10))

  33.     // 本地读取日志文件
  34.     val dir = "D:\\work\\aboutyun_log_analysis\\log\"
  35.     val raw_log: DStream[String] = ssc.textFileStream(dir)

  36.     val pp = PreProcessing(ssc, sqlContext, raw_log)
  37.     val log_dstream = pp.transformLogFormat()

  38.     val schemaLogString = "date c_ip user_agent url url_part tid fid navigation"
  39.     val schemaLog =
  40.       StructType(
  41.         schemaLogString.split(" ")
  42.           .map(fieldName => StructField(fieldName, StringType)))

  43.     // 用于统计业务的spark sql封装对象
  44.     val staSql = StaSQL(dateString)

  45.     // 针对每一个RDD,转为DataFrame,并使用spark sql来计算结果,并将结果输出到mysql中
  46.     log_dstream.foreachRDD{rdd =>
  47.       println("-------------------------------------")
  48.       sqlContext.createDataFrame(rdd, schemaLog).registerTempTable("user_behavior")
  49. //      sqlContext.sql("select * from user_behavior").show(20, false)
  50.       sqlContext.sql(staSql.sql_count_per_ip_pvs).rdd.foreachPartition(iter =>
  51.         Tools.rdd2mysql(Config.aboutyun_ip_sta_table, Config.aboutyun_ip_sta_table_schema, iter))
  52.       sqlContext.sql(staSql.sql_count_top_50_tid).rdd.foreachPartition(iter =>
  53.         Tools.rdd2mysql(Config.aboutyun_hot_post_sta_table, Config.aboutyun_hot_post_sta_table_schema, iter))
  54.       sqlContext.sql(staSql.sql_count_module_pvs).rdd.foreachPartition(iter =>
  55.         Tools.rdd2mysql(Config.aboutyun_module_sta_table, Config.aboutyun_module_sta_table_schema, iter))
  56.       sqlContext.sql(staSql.sql_count_navigation).rdd.foreachPartition(iter =>
  57.         Tools.rdd2mysql(Config.aboutyun_navigation_sta_table, Config.aboutyun_navigation_sta_table_schema, iter))
  58.       sqlContext.sql(staSql.sql_count_in_and_out_station_request).rdd.foreachPartition(iter =>
  59.         Tools.rdd2mysql(Config.aboutyun_station_sta_table, Config.aboutyun_station_sta_table_schema, iter))
  60.       sqlContext.sql(staSql.sql_count_per_ip_per_url_pvs).rdd.foreachPartition(iter =>
  61.         Tools.rdd2mysql(Config.aboutyun_suspicious_ip_sta_table, Config.aboutyun_suspicious_ip_sta_table_schema, iter))
  62.     }

  63.     ssc.start()
  64.     ssc.awaitTermination()

  65.   }

  66. }
复制代码

三、conf包

conf包中包括了Config伴生类,其中包含了论坛版块名称和id的映射关系、mysql的配置信息、以及输出结果的表名和字段名。
QQ截图20170423201107.jpg

四、preprocessing包

preprocessing包中包括了spark streaming程序的日志格式转化、以及一些辅助临时表的注册。
  1. class PreProcessing(ssc: StreamingContext, sqlContext: HiveContext, raw_log: DStream[String]) {

  2.   // 注册一些后面需要用到的关联表
  3.   def registerHelperTable(): Unit ={
  4.     val sc = ssc.sparkContext
  5.     val rddModuleMap = sc.parallelize(Config.moduleMap)
  6.       .map(s => Row.fromSeq(s.split(","))) // 构造板块名称和id映射关系

  7.     val schemaModuleMapString = "module id"
  8.     val schemaModuleMap =
  9.       StructType(
  10.         schemaModuleMapString.split(" ")
  11.           .map(fieldName => StructField(fieldName, StringType)))

  12.     sqlContext.createDataFrame(rddModuleMap, schemaModuleMap).registerTempTable("module_map")
  13.   }

  14.   // 将原始的日志格式转为需要用到的格式
  15.   def transformLogFormat(): DStream[Row] ={

  16.     registerHelperTable()
  17.     val url_regex = "https|http://.*(?=\\))".r  // 匹配出后面是")"的一个url即为包含在user-agent中的url
  18.     val tid_regex = "((?<=tid=)|(?<=thread-))\\d+".r  // 匹配出前面是"tid="或是"thread-"的数字记为帖子id
  19.     val fid_regex = "((?<=fid=)|(?<=forum-))\\d+".r  // 匹配出前面是"fid="或是"formu-"的数字记为板块id

  20.     val log_dstream = raw_log
  21.       .filter(text => !text.contains("#")) // 去除掉带有注释的行
  22.       .map(text => text.split(" ")) // 对每一行使用空格来切分
  23.       .filter(array => array(3).contains(".php")) // 保留带有".php"的内容
  24.       .map { array =>
  25.       val url = if (array(6) != "-") array(6) else url_regex.findFirstIn(array(5)).getOrElse("")
  26.       val tid = tid_regex.findFirstIn(url).getOrElse("")
  27.       val fid = fid_regex.findFirstIn(url).getOrElse("")
  28.       // println(url)
  29.       val requestParameter =  url match {
  30.         case "http://www.aboutyun.com/home.php" => ""
  31.         case _ =>
  32.           if (url.split("\\?").length >1) url.split("\\?")(1) else "其他"
  33.       }
  34.       // if (url.split("\\?").length == 1 && url.eq("http://www.aboutyun.com/home.php")) "" else url.split("\\?")(1)
  35.       val navigation = requestParameter match {
  36.         case "mod=guide" => "导读"
  37.         case "mod=space&do=blog" => "博客"
  38.         case "mod=collection" => "专题"
  39.         case "mod=space&do=share" => "阅读分享"
  40.         case "" => "浏览"
  41.         case _ => "其他"
  42.       }
  43.       Row(array(0) + " " + array(1), array(4), array(5), url, array(3), tid, fid, navigation)
  44.     }
  45.     log_dstream
  46.   }
  47.   
  48. }
复制代码

五、 business包

business包中的StaSQL包含了真正计算业务用到的Spark SQL语句。
QQ截图20170423201637.jpg


六、util包

util包中包括了我们用到的输出结果表的建表语句和将rdd写入到mysql。

QQ截图20170423201933.jpg
  1. package util

  2. import java.sql.{Connection, DriverManager, PreparedStatement}

  3. import conf.Config

  4. /**
  5.   * 封装的工具类
  6.   */
  7. object Tools {

  8.   // 将rdd的内容输出到mysql中
  9.   def rdd2mysql(tableName: String, schema: String, iterator: Iterator[org.apache.spark.sql.Row]): Unit ={

  10.     def getFieldAndTypeTuple(schema: String): Array[(String, String)] = {
  11.       schema.split(",").map{s =>
  12.         val Array(field, fieldType) = s.trim.split(":")
  13.         (field, fieldType)
  14.       }
  15.     }

  16.     val iterArray = iterator.toArray
  17.     val fieldAndTypeTuple: Array[(String, String)] = getFieldAndTypeTuple(schema)
  18.     val fields = fieldAndTypeTuple.map(_._1).mkString(",")
  19.     val values = (for (i <- 0 until fieldAndTypeTuple.length) yield "?").mkString(",")

  20.     var conn: Connection = null
  21.     var ps: PreparedStatement = null
  22.     val sql = "insert into %s (%s) values (%s)".format(tableName, fields, values)
  23.     try {
  24. //      conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/recommenddb", "root", "nopassword")
  25.       conn = DriverManager.getConnection(Config.mysqlUrl, Config.mysqlUser, Config.mysqlPassword)
  26.       conn.setAutoCommit(false)
  27.       ps = conn.prepareStatement(sql)
  28.       // row即为每个rdd中的row
  29.       //      println(sql)
  30.       //      println(iterArray.mkString("@"))
  31.       for (row <- iterArray){
  32.         ps.clearBatch()
  33.         for (i <- 0 until row.length){
  34.           val fieldType = fieldAndTypeTuple(i)._2
  35.           if ( fieldType == "string"){
  36.             ps.setString(i + 1, row(i).toString)
  37.           }else if (fieldType == "int"){
  38.             ps.setInt(i + 1, row(i).toString.toInt)
  39.           }else if (fieldType == "double"){
  40.             ps.setDouble(i + 1, row(i).toString.toDouble)
  41.           }

  42.         }
  43.         ps.addBatch()
  44.         ps.executeBatch()
  45.       }

  46.       conn.commit()

  47.     } catch {
  48.       case e: Exception => println(e)
  49.     } finally {
  50.       if (ps != null) {
  51.         ps.close()
  52.       }
  53.       if (conn != null) {
  54.         conn.close()
  55.       }
  56.     }

  57.   }
  58. }
复制代码

注:完整代码见:链接:http://pan.baidu.com/s/1bp2AuuF 密码:cn7a


没找到任何评论,期待你打破沉寂

关闭

推荐上一条 /2 下一条