分享

Sparksql实战 - 用户行为日志(下)

本帖最后由 a87758133 于 2019-4-4 17:15 编辑
问题导读


1.如何进行性能优化?
2.如何进行数据可视化?
3.如何使用ECharts展示静态饼图数据?
4.如何使用ECharts展示动态查询MySQL中的数据
5.如何使用Zeppelin进行统计结果展示?

上一篇:
Sparksql实战 - 用户行为日志(上)
http://www.aboutyun.com/forum.php?mod=viewthread&tid=26962&page=1&extra=#pid276966



性能优化

性能优化之存储格式的选择
存储格式的选择:http://www.infoq.com/cn/articles/bigdata-store-choose/

性能调优之压缩格式的选择
压缩格式的选择:https://www.ibm.com/developerworks/cn/opensource/os-cn-hadoop-compression-analysis/
注意如果要用支持分割的压缩格式,只有Bzip2直接支持分割

spark中支持配置压缩格式
20181215145722904.png

性能优化之代码优化
  • 选择高性能的算子
  • 复用已有的数据
[mw_shl_code=scala,true]package com.imooc.log

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import org.apache.spark.sql.{DataFrame, SparkSession}

import scala.collection.mutable.ListBuffer

/**
* TopN统计Spark作业:复用已有的数据
*/
object TopNStatJob2 {

  def main(args: Array[String]) {
    val spark = SparkSession.builder().appName("TopNStatJob")
      .config("spark.sql.sources.partitionColumnTypeInference.enabled","false")
      .master("local[2]").getOrCreate()


    val accessDF = spark.read.format("parquet").load("/Users/rocky/data/imooc/clean")

//    accessDF.printSchema()
//    accessDF.show(false)

    val day = "20170511"

    import spark.implicits._
    val commonDF = accessDF.filter($"day" === day && $"cmsType" === "video")

    commonDF.cache()

    StatDAO.deleteData(day)

    //最受欢迎的TopN课程
    videoAccessTopNStat(spark, commonDF)

    //按照地市进行统计TopN课程
    cityAccessTopNStat(spark, commonDF)

    //按照流量进行统计
    videoTrafficsTopNStat(spark, commonDF)

    commonDF.unpersist(true)

    spark.stop()
  }

  /**
   * 按照流量进行统计
   */
  def videoTrafficsTopNStat(spark: SparkSession, commonDF:DataFrame): Unit = {
    import spark.implicits._

    val cityAccessTopNDF = commonDF.groupBy("day","cmsId")
      .agg(sum("traffic").as("traffics"))
    .orderBy($"traffics".desc)
    //.show(false)

    /**
     * 将统计结果写入到MySQL中
     */
    try {
      cityAccessTopNDF.foreachPartition(partitionOfRecords => {
        val list = new ListBuffer[DayVideoTrafficsStat]

        partitionOfRecords.foreach(info => {
          val day = info.getAs[String]("day")
          val cmsId = info.getAs[Long]("cmsId")
          val traffics = info.getAs[Long]("traffics")
          list.append(DayVideoTrafficsStat(day, cmsId,traffics))
        })

        StatDAO.insertDayVideoTrafficsAccessTopN(list)
      })
    } catch {
      case e:Exception => e.printStackTrace()
    }

  }

  /**
   * 按照地市进行统计TopN课程
   */
  def cityAccessTopNStat(spark: SparkSession, commonDF:DataFrame): Unit = {

    val cityAccessTopNDF = commonDF
    .groupBy("day","city","cmsId")
    .agg(count("cmsId").as("times"))

    //cityAccessTopNDF.show(false)

    //Window函数在Spark SQL的使用

    val top3DF = cityAccessTopNDF.select(
      cityAccessTopNDF("day"),
      cityAccessTopNDF("city"),
      cityAccessTopNDF("cmsId"),
      cityAccessTopNDF("times"),
      row_number().over(Window.partitionBy(cityAccessTopNDF("city"))
      .orderBy(cityAccessTopNDF("times").desc)
      ).as("times_rank")
    ).filter("times_rank <=3") //.show(false)  //Top3


    /**
     * 将统计结果写入到MySQL中
     */
    try {
      top3DF.foreachPartition(partitionOfRecords => {
        val list = new ListBuffer[DayCityVideoAccessStat]

        partitionOfRecords.foreach(info => {
          val day = info.getAs[String]("day")
          val cmsId = info.getAs[Long]("cmsId")
          val city = info.getAs[String]("city")
          val times = info.getAs[Long]("times")
          val timesRank = info.getAs[Int]("times_rank")
          list.append(DayCityVideoAccessStat(day, cmsId, city, times, timesRank))
        })

        StatDAO.insertDayCityVideoAccessTopN(list)
      })
    } catch {
      case e:Exception => e.printStackTrace()
    }

  }


    /**
   * 最受欢迎的TopN课程
   */
  def videoAccessTopNStat(spark: SparkSession, commonDF:DataFrame): Unit = {

    /**
     * 使用DataFrame的方式进行统计
     */
    import spark.implicits._

    val videoAccessTopNDF = commonDF
    .groupBy("day","cmsId").agg(count("cmsId").as("times")).orderBy($"times".desc)

    videoAccessTopNDF.show(false)

    /**
     * 使用SQL的方式进行统计
     */
//    accessDF.createOrReplaceTempView("access_logs")
//    val videoAccessTopNDF = spark.sql("select day,cmsId, count(1) as times from access_logs " +
//      "where day='20170511' and cmsType='video' " +
//      "group by day,cmsId order by times desc")
//
//    videoAccessTopNDF.show(false)

    /**
     * 将统计结果写入到MySQL中
     */
    try {
      videoAccessTopNDF.foreachPartition(partitionOfRecords => {
        val list = new ListBuffer[DayVideoAccessStat]

        partitionOfRecords.foreach(info => {
          val day = info.getAs[String]("day")
          val cmsId = info.getAs[Long]("cmsId")
          val times = info.getAs[Long]("times")

          /**
           * 不建议大家在此处进行数据库的数据插入
           */

          list.append(DayVideoAccessStat(day, cmsId, times))
        })

        StatDAO.insertDayVideoAccessTopN(list)
      })
    } catch {
      case e:Exception => e.printStackTrace()
    }

  }

}[/mw_shl_code]
性能调优之参数优化
http://spark.apache.org/docs/2.2.1/sql-programming-guide.html#other-configuration-options
20181215150410517.png
一般在生产上200是不够用的
[mw_shl_code=shell,true]调整并行度

./bin/spark-submit \

--class com.imooc.log.TopNStatJobYARN \

--name TopNStatJobYARN \

--master yarn \

--executor-memory 1G \

--num-executors 1 \

--conf spark.sql.shuffle.partitions=100 \

/home/hadoop/lib/sql-1.0-jar-with-dependencies.jar \

hdfs://hadoop001:8020/imooc/clean 20170511 [/mw_shl_code]
分区字段类型推测:spark.sql.sources.partitionColumnTypelnference.enabled
这个在上面某小节中已经提到过

功能实现之数据可视化展示概述
百度echarts:
http://echarts.baidu.com/echarts2/doc/example.html

数据可视化:一副图片最伟大的价值莫过于它能够使得我们实际看到的比我们期望看到的内容更加丰富

常见的可视化框架
1)echarts
2)highcharts
3)D3.js
4)HUE (不需要开发;直接sql就可图形化查看)
5)Zeppelin (不需要开发;直接sql就可图形化查看)

ECharts饼图静态数据展示
五分钟上手:
http://echarts.baidu.com/tutorial.html#5 分钟上手 ECharts
创建一个webapp
20181215151735564.png

20181215151959891.png

[mw_shl_code=text,true]<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>Echarts HelloWorld</title>

    <!-- 引入 ECharts 文件 -->
    <script src="js/echarts.min.js"></script>
</head>
<body>

<!-- 为 ECharts 准备一个具备大小(宽高)的 DOM -->
<div id="main" style="width: 600px;height:400px;"></div>

<script type="text/javascript">
    // 基于准备好的dom,初始化echarts实例
    var myChart = echarts.init(document.getElementById('main'));

    // 指定图表的配置项和数据
    var option = {
        title : {
            text: '某站点用户访问来源',
            subtext: '纯属虚构',
            x:'center'
        },
        tooltip : {
            trigger: 'item',
            formatter: "{a} <br/>{b} : {c} ({d}%)"
        },
        legend: {
            orient: 'vertical',
            left: 'left',
            data: ['直接访问','邮件营销','联盟广告','视频广告','搜索引擎']
        },
        series : [
            {
                name: '访问来源',
                type: 'pie',
                radius : '55%',
                center: ['50%', '60%'],
                data:[
                    {value:335, name:'直接访问'},
                    {value:310, name:'邮件营销'},
                    {value:234, name:'联盟广告'},
                    {value:135, name:'视频广告'},
                    {value:1548, name:'搜索引擎'}
                ],
                itemStyle: {
                    emphasis: {
                        shadowBlur: 10,
                        shadowOffsetX: 0,
                        shadowColor: 'rgba(0, 0, 0, 0.5)'
                    }
                }
            }
        ]
    };


    // 使用刚指定的配置项和数据显示图表。
    myChart.setOption(option);
</script>
</body>
</html>
[/mw_shl_code]
右键运行test.html
201812151522120.png
结果
2018121515232869.png

ECharts饼图动态展示之一查询MySQL中的数据
以下只是需求一的展示;其他需求和需求一实现方式一样

添加maven依赖
[mw_shl_code=xml,true]<dependencies>

        <dependency>

            <groupId>javax.servlet</groupId>

            <artifactId>servlet-api</artifactId>

            <version>2.5</version>

        </dependency>



        <dependency>

            <groupId>javax.servlet</groupId>

            <artifactId>jsp-api</artifactId>

            <version>2.0</version>

        </dependency>



        <dependency>

            <groupId>mysql</groupId>

            <artifactId>mysql-connector-java</artifactId>

            <version>5.1.38</version>

        </dependency>



        <dependency>

            <groupId>net.sf.json-lib</groupId>

            <artifactId>json-lib</artifactId>

            <version>2.4</version>

            <classifier>jdk15</classifier>

        </dependency>

    </dependencies>[/mw_shl_code]

Utils工具类
[mw_shl_code=java,true]package com.imooc.utils;

import java.sql.*;

/**
* 操作MySQL的工具类
*/
public class MySQLUtils {

    private static final String USERNAME = "root";

    private static final String PASSWORD = "root";

    private static final String DRIVERCLASS = "com.mysql.jdbc.Driver";

    private static final String URL = "jdbc:mysql://localhost:3306/imooc_project";


    /**
     * 获取数据库连接
     */
    public static Connection getConnection() {
        Connection connection = null;
        try {
            Class.forName(DRIVERCLASS);
            connection = DriverManager.getConnection(URL,USERNAME,PASSWORD);
        } catch (Exception e) {
            e.printStackTrace();
        }

        return connection;
    }


    /**
     * 释放资源
     */
    public static void release(Connection connection, PreparedStatement pstmt, ResultSet rs) {
        if(rs != null) {
            try {
                rs.close();
            } catch (SQLException e) {
                e.printStackTrace();
            }
        }

        if(pstmt != null) {
            try {
                pstmt.close();
            } catch (SQLException e) {
                e.printStackTrace();
            }
        }

        if(connection != null) {
            try {
                connection.close();
            } catch (SQLException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) {
        System.out.println(getConnection());
    }

}[/mw_shl_code]
实体类
[mw_shl_code=java,true]package com.imooc.domain;

public class VideoAccessTopN {

    private String name;
    private long value ;


    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public long getValue() {
        return value;
    }

    public void setValue(long value) {
        this.value = value;
    }
}[/mw_shl_code]

dao层
[mw_shl_code=java,true]package com.imooc.dao;

import com.imooc.domain.VideoAccessTopN;
import com.imooc.utils.MySQLUtils;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
* 面向接口编程
*/
public class VideoAccessTopNDAO {


    static Map<String,String> courses = new HashMap<String,String>();
    static {
        courses.put("4000", "MySQL优化");
        courses.put("4500", "Crontab");
        courses.put("4600", "Swift");
        courses.put("14540", "SpringData");
        courses.put("14704", "R");
        courses.put("14390", "机器学习");
        courses.put("14322", "redis");
        courses.put("14390", "神经网络");
        courses.put("14623", "Docker");
    }

    /**
     * 根据课程编号查询课程名称
     */
    public String getCourseName(String id) {
        return courses.get(id);
    }


    /**
     * 根据day查询当天的最受欢迎的Top5课程
     * @param day
     */
    public List<VideoAccessTopN> query(String day) {
        List<VideoAccessTopN> list = new ArrayList<VideoAccessTopN>();

        Connection connection = null;
        PreparedStatement psmt = null;
        ResultSet rs = null;

        try {
            connection = MySQLUtils.getConnection();
            String sql = "select cms_id ,times  from  day_video_access_topn_stat where day =? order by times desc limit 5";
            psmt = connection.prepareStatement(sql);
            psmt.setString(1, day);

            rs = psmt.executeQuery();

            VideoAccessTopN domain = null;
            while(rs.next()) {
                domain = new VideoAccessTopN();
                /**
                 * TODO... 在页面上应该显示的是课程名称,而我们此时拿到的是课程编号
                 *
                 * 如何根据课程编号去获取课程名称呢?
                 * 编号和名称是有一个对应关系的,一般是存放在关系型数据库
                 */
                domain.setName(getCourseName(rs.getLong("cms_id")+""));
                domain.setValue(rs.getLong("times"));

                list.add(domain);
            }

        }catch (Exception e) {
            e.printStackTrace();
        } finally {
            MySQLUtils.release(connection, psmt, rs);
        }
        return list;
    }

    public static void main(String[] args) {
        VideoAccessTopNDAO dao = new VideoAccessTopNDAO();
        List<VideoAccessTopN> list = dao.query("20170511");
        for(VideoAccessTopN result: list) {
            System.out.println(result.getName() + " , " + result.getValue());
        }
    }

}[/mw_shl_code]
web层
[mw_shl_code=java,true]package com.imooc.web;

import com.imooc.dao.VideoAccessTopNDAO;
import com.imooc.domain.VideoAccessTopN;
import net.sf.json.JSONArray;

import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.List;

/**
* 最受欢迎的TOPN课程
*
* Web ==> Service ==> DAO
*/
public class VideoAccessTopNServlet extends HttpServlet{

    private VideoAccessTopNDAO dao;

    @Override
    public void init() throws ServletException {
        dao = new VideoAccessTopNDAO();
    }

    @Override
    protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
        String day = req.getParameter("day");

        List<VideoAccessTopN> results =  dao.query(day);
        JSONArray json = JSONArray.fromObject(results);

        resp.setContentType("text/html;charset=utf-8");

        PrintWriter writer = resp.getWriter();
        writer.println(json);
        writer.flush();
        writer.close();

    }

    @Override
    protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
        this.doGet(req, resp);
    }
[/mw_shl_code]
ECharts饼图动态展示之二前端开发
web.xml配置
[mw_shl_code=xml,true]<!DOCTYPE web-app PUBLIC
"-//Sun Microsystems, Inc.//DTD Web Application 2.3//EN"
"http://java.sun.com/dtd/web-app_2_3.dtd" >

<web-app>
  <display-name>Archetype Created Web Application</display-name>

  <servlet>
    <servlet-name>stat</servlet-name>
    <servlet-class>com.imooc.web.VideoAccessTopNServlet</servlet-class>
  </servlet>
  
  <servlet-mapping>
    <servlet-name>stat</servlet-name>
    <url-pattern>/stat</url-pattern>
  </servlet-mapping>
</web-app>[/mw_shl_code]
topn需求一的页面
[mw_shl_code=text,true]<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>Echarts HelloWorld</title>

    <!-- 引入 ECharts 文件 -->
    <script src="js/echarts.min.js"></script>
    <SCRIPT src="js/jquery.js"></SCRIPT>
</head>
<body>

<!-- 为 ECharts 准备一个具备大小(宽高)的 DOM -->
<div id="main" style="width: 600px;height:400px;"></div>

<script type="text/javascript">
    // 基于准备好的dom,初始化echarts实例
    var myChart = echarts.init(document.getElementById('main'));

    // 指定图表的配置项和数据
    var option = {
        title : {
            text: '主站最受欢迎的TopN课程',
            x:'center'
        },
        tooltip : {
            trigger: 'item',
            formatter: "{a} <br/>{b} : {c} ({d}%)"
        },
        legend: {
            orient: 'vertical',
            left: 'left',
            data: []
        },
        series : [
            {
                name: '访问次数',
                type: 'pie',
                radius : '55%',
                center: ['50%', '60%'],
                data:(function(){
                    var courses = [];
                    $.ajax({
                        type:"GET",
                        url:"/stat?day=20170511",
                        dataType:'json',
                        async:false,
                        success:function(result) {
                            for(var i=0;i<result.length; i++){
                                courses.push({"value": result.value,"name":result.name});
                            }
                        }
                    })
                    return courses;
                })(),
                itemStyle: {
                    emphasis: {
                        shadowBlur: 10,
                        shadowOffsetX: 0,
                        shadowColor: 'rgba(0, 0, 0, 0.5)'
                    }
                }
            }
        ]
    };


    // 使用刚指定的配置项和数据显示图表。
    myChart.setOption(option);
</script>
</body>
</html>[/mw_shl_code]
20181215154230789.png


使用Zeppelin进行统计结果的展示
下载Zeppelin
http://zeppelin.apache.org/download.html
解压后直接进入bin目录运行
20181215154447222.png

默认端口是8080;打开后
20181215154642967.png
20181215154857956.png

20181215155203283.png



2018121515554083.png








最新经典文章,欢迎关注公众号

来源:CSDN

作者:-无妄-

原文:《Sparksql实战 - 用户行为日志》

https://blog.csdn.net/bingdianone/article/details/85013293#imooc_44


已有(1)人评论

跳转到指定楼层
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条