接上篇:about云日志分析项目准备11:spark streaming 接收 flume 监控目录的日志文件
这篇主要讲解如何使用spark streaming在本地实现业务逻辑
一、工程结构
整个工程分为以下几个模块
business:业务代码
conf:配置信息
preprocessing:日志预处理过程
util:工具类
Run:启动程序
二、Run
这是一个入口类,包含了spark streaming程序的启动和结果输出相关的程序
- import java.text.SimpleDateFormat
- import java.util.{Calendar, Date}
- import business.StaSQL
- import conf.Config
- import org.apache.log4j.{Level, Logger}
- import org.apache.spark.sql.hive.HiveContext
- import org.apache.spark.sql.types.{StringType, StructField, StructType}
- import org.apache.spark.streaming.dstream.DStream
- import org.apache.spark.streaming.{Seconds, StreamingContext}
- import org.apache.spark.{SparkConf, SparkContext}
- import preprocessing.PreProcessing
- import util.Tools
- /**
- * 入口类
- */
- object Run {
- def main(args: Array[String]): Unit = {
- Logger.getRootLogger.setLevel(Level.WARN)
- val date = new Date()
- val calendar = Calendar.getInstance()
- calendar.add(Calendar.DATE, -1)
- val total_time = calendar.getTime()
- val formatter = new SimpleDateFormat("yyyyMMdd")
- var dateString = formatter.format(total_time)
- // 创建SparkConf对象,并指定AppName和Master
- val conf = new SparkConf()
- .setAppName("StartProgram")
- .setMaster("local[2]")
- val sc = new SparkContext(conf)
- val sqlContext = new HiveContext(sc)
- // 创建StreamingContext对象
- val ssc = new StreamingContext(sc, Seconds(10))
- // 本地读取日志文件
- val dir = "D:\\work\\aboutyun_log_analysis\\log\"
- val raw_log: DStream[String] = ssc.textFileStream(dir)
- val pp = PreProcessing(ssc, sqlContext, raw_log)
- val log_dstream = pp.transformLogFormat()
- val schemaLogString = "date c_ip user_agent url url_part tid fid navigation"
- val schemaLog =
- StructType(
- schemaLogString.split(" ")
- .map(fieldName => StructField(fieldName, StringType)))
- // 用于统计业务的spark sql封装对象
- val staSql = StaSQL(dateString)
- // 针对每一个RDD,转为DataFrame,并使用spark sql来计算结果,并将结果输出到mysql中
- log_dstream.foreachRDD{rdd =>
- println("-------------------------------------")
- sqlContext.createDataFrame(rdd, schemaLog).registerTempTable("user_behavior")
- // sqlContext.sql("select * from user_behavior").show(20, false)
- sqlContext.sql(staSql.sql_count_per_ip_pvs).rdd.foreachPartition(iter =>
- Tools.rdd2mysql(Config.aboutyun_ip_sta_table, Config.aboutyun_ip_sta_table_schema, iter))
- sqlContext.sql(staSql.sql_count_top_50_tid).rdd.foreachPartition(iter =>
- Tools.rdd2mysql(Config.aboutyun_hot_post_sta_table, Config.aboutyun_hot_post_sta_table_schema, iter))
- sqlContext.sql(staSql.sql_count_module_pvs).rdd.foreachPartition(iter =>
- Tools.rdd2mysql(Config.aboutyun_module_sta_table, Config.aboutyun_module_sta_table_schema, iter))
- sqlContext.sql(staSql.sql_count_navigation).rdd.foreachPartition(iter =>
- Tools.rdd2mysql(Config.aboutyun_navigation_sta_table, Config.aboutyun_navigation_sta_table_schema, iter))
- sqlContext.sql(staSql.sql_count_in_and_out_station_request).rdd.foreachPartition(iter =>
- Tools.rdd2mysql(Config.aboutyun_station_sta_table, Config.aboutyun_station_sta_table_schema, iter))
- sqlContext.sql(staSql.sql_count_per_ip_per_url_pvs).rdd.foreachPartition(iter =>
- Tools.rdd2mysql(Config.aboutyun_suspicious_ip_sta_table, Config.aboutyun_suspicious_ip_sta_table_schema, iter))
- }
- ssc.start()
- ssc.awaitTermination()
- }
- }
复制代码
三、conf包
conf包中包括了Config伴生类,其中包含了论坛版块名称和id的映射关系、mysql的配置信息、以及输出结果的表名和字段名。
四、preprocessing包
preprocessing包中包括了spark streaming程序的日志格式转化、以及一些辅助临时表的注册。
- class PreProcessing(ssc: StreamingContext, sqlContext: HiveContext, raw_log: DStream[String]) {
- // 注册一些后面需要用到的关联表
- def registerHelperTable(): Unit ={
- val sc = ssc.sparkContext
- val rddModuleMap = sc.parallelize(Config.moduleMap)
- .map(s => Row.fromSeq(s.split(","))) // 构造板块名称和id映射关系
- val schemaModuleMapString = "module id"
- val schemaModuleMap =
- StructType(
- schemaModuleMapString.split(" ")
- .map(fieldName => StructField(fieldName, StringType)))
- sqlContext.createDataFrame(rddModuleMap, schemaModuleMap).registerTempTable("module_map")
- }
- // 将原始的日志格式转为需要用到的格式
- def transformLogFormat(): DStream[Row] ={
- registerHelperTable()
- val url_regex = "https|http://.*(?=\\))".r // 匹配出后面是")"的一个url即为包含在user-agent中的url
- val tid_regex = "((?<=tid=)|(?<=thread-))\\d+".r // 匹配出前面是"tid="或是"thread-"的数字记为帖子id
- val fid_regex = "((?<=fid=)|(?<=forum-))\\d+".r // 匹配出前面是"fid="或是"formu-"的数字记为板块id
- val log_dstream = raw_log
- .filter(text => !text.contains("#")) // 去除掉带有注释的行
- .map(text => text.split(" ")) // 对每一行使用空格来切分
- .filter(array => array(3).contains(".php")) // 保留带有".php"的内容
- .map { array =>
- val url = if (array(6) != "-") array(6) else url_regex.findFirstIn(array(5)).getOrElse("")
- val tid = tid_regex.findFirstIn(url).getOrElse("")
- val fid = fid_regex.findFirstIn(url).getOrElse("")
- // println(url)
- val requestParameter = url match {
- case "http://www.aboutyun.com/home.php" => ""
- case _ =>
- if (url.split("\\?").length >1) url.split("\\?")(1) else "其他"
- }
- // if (url.split("\\?").length == 1 && url.eq("http://www.aboutyun.com/home.php")) "" else url.split("\\?")(1)
- val navigation = requestParameter match {
- case "mod=guide" => "导读"
- case "mod=space&do=blog" => "博客"
- case "mod=collection" => "专题"
- case "mod=space&do=share" => "阅读分享"
- case "" => "浏览"
- case _ => "其他"
- }
- Row(array(0) + " " + array(1), array(4), array(5), url, array(3), tid, fid, navigation)
- }
- log_dstream
- }
-
- }
复制代码
五、 business包
business包中的StaSQL包含了真正计算业务用到的Spark SQL语句。
六、util包
util包中包括了我们用到的输出结果表的建表语句和将rdd写入到mysql。
- package util
- import java.sql.{Connection, DriverManager, PreparedStatement}
- import conf.Config
- /**
- * 封装的工具类
- */
- object Tools {
- // 将rdd的内容输出到mysql中
- def rdd2mysql(tableName: String, schema: String, iterator: Iterator[org.apache.spark.sql.Row]): Unit ={
- def getFieldAndTypeTuple(schema: String): Array[(String, String)] = {
- schema.split(",").map{s =>
- val Array(field, fieldType) = s.trim.split(":")
- (field, fieldType)
- }
- }
- val iterArray = iterator.toArray
- val fieldAndTypeTuple: Array[(String, String)] = getFieldAndTypeTuple(schema)
- val fields = fieldAndTypeTuple.map(_._1).mkString(",")
- val values = (for (i <- 0 until fieldAndTypeTuple.length) yield "?").mkString(",")
- var conn: Connection = null
- var ps: PreparedStatement = null
- val sql = "insert into %s (%s) values (%s)".format(tableName, fields, values)
- try {
- // conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/recommenddb", "root", "nopassword")
- conn = DriverManager.getConnection(Config.mysqlUrl, Config.mysqlUser, Config.mysqlPassword)
- conn.setAutoCommit(false)
- ps = conn.prepareStatement(sql)
- // row即为每个rdd中的row
- // println(sql)
- // println(iterArray.mkString("@"))
- for (row <- iterArray){
- ps.clearBatch()
- for (i <- 0 until row.length){
- val fieldType = fieldAndTypeTuple(i)._2
- if ( fieldType == "string"){
- ps.setString(i + 1, row(i).toString)
- }else if (fieldType == "int"){
- ps.setInt(i + 1, row(i).toString.toInt)
- }else if (fieldType == "double"){
- ps.setDouble(i + 1, row(i).toString.toDouble)
- }
- }
- ps.addBatch()
- ps.executeBatch()
- }
- conn.commit()
- } catch {
- case e: Exception => println(e)
- } finally {
- if (ps != null) {
- ps.close()
- }
- if (conn != null) {
- conn.close()
- }
- }
- }
- }
复制代码
注:完整代码见:链接:http://pan.baidu.com/s/1bp2AuuF 密码:cn7a
|
|