本帖最后由 a87758133 于 2019-4-4 17:15 编辑
问题导读
1.如何进行性能优化?
2.如何进行数据可视化?
3.如何使用ECharts展示静态饼图数据?
4.如何使用ECharts展示动态查询MySQL中的数据?
5.如何使用Zeppelin进行统计结果展示?
上一篇:
Sparksql实战 - 用户行为日志(上)
http://www.aboutyun.com/forum.php?mod=viewthread&tid=26962&page=1&extra=#pid276966
性能优化
性能优化之存储格式的选择
存储格式的选择:http://www.infoq.com/cn/articles/bigdata-store-choose/
性能调优之压缩格式的选择
压缩格式的选择:https://www.ibm.com/developerworks/cn/opensource/os-cn-hadoop-compression-analysis/
注意如果要用支持分割的压缩格式,只有Bzip2直接支持分割
spark中支持配置压缩格式
性能优化之代码优化
[mw_shl_code=scala,true]package com.imooc.log
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import org.apache.spark.sql.{DataFrame, SparkSession}
import scala.collection.mutable.ListBuffer
/**
* TopN统计Spark作业:复用已有的数据
*/
object TopNStatJob2 {
def main(args: Array[String]) {
val spark = SparkSession.builder().appName("TopNStatJob")
.config("spark.sql.sources.partitionColumnTypeInference.enabled","false")
.master("local[2]").getOrCreate()
val accessDF = spark.read.format("parquet").load("/Users/rocky/data/imooc/clean")
// accessDF.printSchema()
// accessDF.show(false)
val day = "20170511"
import spark.implicits._
val commonDF = accessDF.filter($"day" === day && $"cmsType" === "video")
commonDF.cache()
StatDAO.deleteData(day)
//最受欢迎的TopN课程
videoAccessTopNStat(spark, commonDF)
//按照地市进行统计TopN课程
cityAccessTopNStat(spark, commonDF)
//按照流量进行统计
videoTrafficsTopNStat(spark, commonDF)
commonDF.unpersist(true)
spark.stop()
}
/**
* 按照流量进行统计
*/
def videoTrafficsTopNStat(spark: SparkSession, commonDF:DataFrame): Unit = {
import spark.implicits._
val cityAccessTopNDF = commonDF.groupBy("day","cmsId")
.agg(sum("traffic").as("traffics"))
.orderBy($"traffics".desc)
//.show(false)
/**
* 将统计结果写入到MySQL中
*/
try {
cityAccessTopNDF.foreachPartition(partitionOfRecords => {
val list = new ListBuffer[DayVideoTrafficsStat]
partitionOfRecords.foreach(info => {
val day = info.getAs[String]("day")
val cmsId = info.getAs[Long]("cmsId")
val traffics = info.getAs[Long]("traffics")
list.append(DayVideoTrafficsStat(day, cmsId,traffics))
})
StatDAO.insertDayVideoTrafficsAccessTopN(list)
})
} catch {
case e:Exception => e.printStackTrace()
}
}
/**
* 按照地市进行统计TopN课程
*/
def cityAccessTopNStat(spark: SparkSession, commonDF:DataFrame): Unit = {
val cityAccessTopNDF = commonDF
.groupBy("day","city","cmsId")
.agg(count("cmsId").as("times"))
//cityAccessTopNDF.show(false)
//Window函数在Spark SQL的使用
val top3DF = cityAccessTopNDF.select(
cityAccessTopNDF("day"),
cityAccessTopNDF("city"),
cityAccessTopNDF("cmsId"),
cityAccessTopNDF("times"),
row_number().over(Window.partitionBy(cityAccessTopNDF("city"))
.orderBy(cityAccessTopNDF("times").desc)
).as("times_rank")
).filter("times_rank <=3") //.show(false) //Top3
/**
* 将统计结果写入到MySQL中
*/
try {
top3DF.foreachPartition(partitionOfRecords => {
val list = new ListBuffer[DayCityVideoAccessStat]
partitionOfRecords.foreach(info => {
val day = info.getAs[String]("day")
val cmsId = info.getAs[Long]("cmsId")
val city = info.getAs[String]("city")
val times = info.getAs[Long]("times")
val timesRank = info.getAs[Int]("times_rank")
list.append(DayCityVideoAccessStat(day, cmsId, city, times, timesRank))
})
StatDAO.insertDayCityVideoAccessTopN(list)
})
} catch {
case e:Exception => e.printStackTrace()
}
}
/**
* 最受欢迎的TopN课程
*/
def videoAccessTopNStat(spark: SparkSession, commonDF:DataFrame): Unit = {
/**
* 使用DataFrame的方式进行统计
*/
import spark.implicits._
val videoAccessTopNDF = commonDF
.groupBy("day","cmsId").agg(count("cmsId").as("times")).orderBy($"times".desc)
videoAccessTopNDF.show(false)
/**
* 使用SQL的方式进行统计
*/
// accessDF.createOrReplaceTempView("access_logs")
// val videoAccessTopNDF = spark.sql("select day,cmsId, count(1) as times from access_logs " +
// "where day='20170511' and cmsType='video' " +
// "group by day,cmsId order by times desc")
//
// videoAccessTopNDF.show(false)
/**
* 将统计结果写入到MySQL中
*/
try {
videoAccessTopNDF.foreachPartition(partitionOfRecords => {
val list = new ListBuffer[DayVideoAccessStat]
partitionOfRecords.foreach(info => {
val day = info.getAs[String]("day")
val cmsId = info.getAs[Long]("cmsId")
val times = info.getAs[Long]("times")
/**
* 不建议大家在此处进行数据库的数据插入
*/
list.append(DayVideoAccessStat(day, cmsId, times))
})
StatDAO.insertDayVideoAccessTopN(list)
})
} catch {
case e:Exception => e.printStackTrace()
}
}
}[/mw_shl_code]
性能调优之参数优化
http://spark.apache.org/docs/2.2.1/sql-programming-guide.html#other-configuration-options
一般在生产上200是不够用的
[mw_shl_code=shell,true]调整并行度
./bin/spark-submit \
--class com.imooc.log.TopNStatJobYARN \
--name TopNStatJobYARN \
--master yarn \
--executor-memory 1G \
--num-executors 1 \
--conf spark.sql.shuffle.partitions=100 \
/home/hadoop/lib/sql-1.0-jar-with-dependencies.jar \
hdfs://hadoop001:8020/imooc/clean 20170511 [/mw_shl_code]
分区字段类型推测:spark.sql.sources.partitionColumnTypelnference.enabled
这个在上面某小节中已经提到过
功能实现之数据可视化展示概述
百度echarts:
http://echarts.baidu.com/echarts2/doc/example.html
数据可视化:一副图片最伟大的价值莫过于它能够使得我们实际看到的比我们期望看到的内容更加丰富
常见的可视化框架
1)echarts
2)highcharts
3)D3.js
4)HUE (不需要开发;直接sql就可图形化查看)
5)Zeppelin (不需要开发;直接sql就可图形化查看)
ECharts饼图静态数据展示
五分钟上手:
http://echarts.baidu.com/tutorial.html#5 分钟上手 ECharts
创建一个webapp
[mw_shl_code=text,true]<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Echarts HelloWorld</title>
<!-- 引入 ECharts 文件 -->
<script src="js/echarts.min.js"></script>
</head>
<body>
<!-- 为 ECharts 准备一个具备大小(宽高)的 DOM -->
<div id="main" style="width: 600px;height:400px;"></div>
<script type="text/javascript">
// 基于准备好的dom,初始化echarts实例
var myChart = echarts.init(document.getElementById('main'));
// 指定图表的配置项和数据
var option = {
title : {
text: '某站点用户访问来源',
subtext: '纯属虚构',
x:'center'
},
tooltip : {
trigger: 'item',
formatter: "{a} <br/>{b} : {c} ({d}%)"
},
legend: {
orient: 'vertical',
left: 'left',
data: ['直接访问','邮件营销','联盟广告','视频广告','搜索引擎']
},
series : [
{
name: '访问来源',
type: 'pie',
radius : '55%',
center: ['50%', '60%'],
data:[
{value:335, name:'直接访问'},
{value:310, name:'邮件营销'},
{value:234, name:'联盟广告'},
{value:135, name:'视频广告'},
{value:1548, name:'搜索引擎'}
],
itemStyle: {
emphasis: {
shadowBlur: 10,
shadowOffsetX: 0,
shadowColor: 'rgba(0, 0, 0, 0.5)'
}
}
}
]
};
// 使用刚指定的配置项和数据显示图表。
myChart.setOption(option);
</script>
</body>
</html>
[/mw_shl_code]
右键运行test.html
结果
ECharts饼图动态展示之一查询MySQL中的数据
以下只是需求一的展示;其他需求和需求一实现方式一样
添加maven依赖
[mw_shl_code=xml,true]<dependencies>
<dependency>
<groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId>
<version>2.5</version>
</dependency>
<dependency>
<groupId>javax.servlet</groupId>
<artifactId>jsp-api</artifactId>
<version>2.0</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.38</version>
</dependency>
<dependency>
<groupId>net.sf.json-lib</groupId>
<artifactId>json-lib</artifactId>
<version>2.4</version>
<classifier>jdk15</classifier>
</dependency>
</dependencies>[/mw_shl_code]
Utils工具类
[mw_shl_code=java,true]package com.imooc.utils;
import java.sql.*;
/**
* 操作MySQL的工具类
*/
public class MySQLUtils {
private static final String USERNAME = "root";
private static final String PASSWORD = "root";
private static final String DRIVERCLASS = "com.mysql.jdbc.Driver";
private static final String URL = "jdbc:mysql://localhost:3306/imooc_project";
/**
* 获取数据库连接
*/
public static Connection getConnection() {
Connection connection = null;
try {
Class.forName(DRIVERCLASS);
connection = DriverManager.getConnection(URL,USERNAME,PASSWORD);
} catch (Exception e) {
e.printStackTrace();
}
return connection;
}
/**
* 释放资源
*/
public static void release(Connection connection, PreparedStatement pstmt, ResultSet rs) {
if(rs != null) {
try {
rs.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
if(pstmt != null) {
try {
pstmt.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
if(connection != null) {
try {
connection.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
System.out.println(getConnection());
}
}[/mw_shl_code]
实体类
[mw_shl_code=java,true]package com.imooc.domain;
public class VideoAccessTopN {
private String name;
private long value ;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public long getValue() {
return value;
}
public void setValue(long value) {
this.value = value;
}
}[/mw_shl_code]
dao层
[mw_shl_code=java,true]package com.imooc.dao;
import com.imooc.domain.VideoAccessTopN;
import com.imooc.utils.MySQLUtils;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* 面向接口编程
*/
public class VideoAccessTopNDAO {
static Map<String,String> courses = new HashMap<String,String>();
static {
courses.put("4000", "MySQL优化");
courses.put("4500", "Crontab");
courses.put("4600", "Swift");
courses.put("14540", "SpringData");
courses.put("14704", "R");
courses.put("14390", "机器学习");
courses.put("14322", "redis");
courses.put("14390", "神经网络");
courses.put("14623", "Docker");
}
/**
* 根据课程编号查询课程名称
*/
public String getCourseName(String id) {
return courses.get(id);
}
/**
* 根据day查询当天的最受欢迎的Top5课程
* @param day
*/
public List<VideoAccessTopN> query(String day) {
List<VideoAccessTopN> list = new ArrayList<VideoAccessTopN>();
Connection connection = null;
PreparedStatement psmt = null;
ResultSet rs = null;
try {
connection = MySQLUtils.getConnection();
String sql = "select cms_id ,times from day_video_access_topn_stat where day =? order by times desc limit 5";
psmt = connection.prepareStatement(sql);
psmt.setString(1, day);
rs = psmt.executeQuery();
VideoAccessTopN domain = null;
while(rs.next()) {
domain = new VideoAccessTopN();
/**
* TODO... 在页面上应该显示的是课程名称,而我们此时拿到的是课程编号
*
* 如何根据课程编号去获取课程名称呢?
* 编号和名称是有一个对应关系的,一般是存放在关系型数据库
*/
domain.setName(getCourseName(rs.getLong("cms_id")+""));
domain.setValue(rs.getLong("times"));
list.add(domain);
}
}catch (Exception e) {
e.printStackTrace();
} finally {
MySQLUtils.release(connection, psmt, rs);
}
return list;
}
public static void main(String[] args) {
VideoAccessTopNDAO dao = new VideoAccessTopNDAO();
List<VideoAccessTopN> list = dao.query("20170511");
for(VideoAccessTopN result: list) {
System.out.println(result.getName() + " , " + result.getValue());
}
}
}[/mw_shl_code]
web层
[mw_shl_code=java,true]package com.imooc.web;
import com.imooc.dao.VideoAccessTopNDAO;
import com.imooc.domain.VideoAccessTopN;
import net.sf.json.JSONArray;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.List;
/**
* 最受欢迎的TOPN课程
*
* Web ==> Service ==> DAO
*/
public class VideoAccessTopNServlet extends HttpServlet{
private VideoAccessTopNDAO dao;
@Override
public void init() throws ServletException {
dao = new VideoAccessTopNDAO();
}
@Override
protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
String day = req.getParameter("day");
List<VideoAccessTopN> results = dao.query(day);
JSONArray json = JSONArray.fromObject(results);
resp.setContentType("text/html;charset=utf-8");
PrintWriter writer = resp.getWriter();
writer.println(json);
writer.flush();
writer.close();
}
@Override
protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
this.doGet(req, resp);
}
[/mw_shl_code]
ECharts饼图动态展示之二前端开发
web.xml配置
[mw_shl_code=xml,true]<!DOCTYPE web-app PUBLIC
"-//Sun Microsystems, Inc.//DTD Web Application 2.3//EN"
"http://java.sun.com/dtd/web-app_2_3.dtd" >
<web-app>
<display-name>Archetype Created Web Application</display-name>
<servlet>
<servlet-name>stat</servlet-name>
<servlet-class>com.imooc.web.VideoAccessTopNServlet</servlet-class>
</servlet>
<servlet-mapping>
<servlet-name>stat</servlet-name>
<url-pattern>/stat</url-pattern>
</servlet-mapping>
</web-app>[/mw_shl_code]
topn需求一的页面
[mw_shl_code=text,true]<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Echarts HelloWorld</title>
<!-- 引入 ECharts 文件 -->
<script src="js/echarts.min.js"></script>
<SCRIPT src="js/jquery.js"></SCRIPT>
</head>
<body>
<!-- 为 ECharts 准备一个具备大小(宽高)的 DOM -->
<div id="main" style="width: 600px;height:400px;"></div>
<script type="text/javascript">
// 基于准备好的dom,初始化echarts实例
var myChart = echarts.init(document.getElementById('main'));
// 指定图表的配置项和数据
var option = {
title : {
text: '主站最受欢迎的TopN课程',
x:'center'
},
tooltip : {
trigger: 'item',
formatter: "{a} <br/>{b} : {c} ({d}%)"
},
legend: {
orient: 'vertical',
left: 'left',
data: []
},
series : [
{
name: '访问次数',
type: 'pie',
radius : '55%',
center: ['50%', '60%'],
data:(function(){
var courses = [];
$.ajax({
type:"GET",
url:"/stat?day=20170511",
dataType:'json',
async:false,
success:function(result) {
for(var i=0;i<result.length; i++){
courses.push({"value": result.value,"name":result.name});
}
}
})
return courses;
})(),
itemStyle: {
emphasis: {
shadowBlur: 10,
shadowOffsetX: 0,
shadowColor: 'rgba(0, 0, 0, 0.5)'
}
}
}
]
};
// 使用刚指定的配置项和数据显示图表。
myChart.setOption(option);
</script>
</body>
</html>[/mw_shl_code]
使用Zeppelin进行统计结果的展示
下载Zeppelin
http://zeppelin.apache.org/download.html
解压后直接进入bin目录运行
默认端口是8080;打开后
最新经典文章,欢迎关注公众号
来源:CSDN 作者:-无妄- 原文:《Sparksql实战 - 用户行为日志》 https://blog.csdn.net/bingdianone/article/details/85013293#imooc_44
|