问题导读
1.如何离线采集数据?
2.Hive和HBase如何关联?
3.如何求出热点话题Top10?
4.如何实时采集数据?
5.实时处理数据如何存入Redis中?
本次项目是基于企业大数据经典案例项目(大数据日志分析),全方位、全流程讲解大数据项目的业务分析、技术选型、架构设计、集群规划、安装部署、整合继承与开发和web可视化交互设计。
一、业务需求分析
(一)捕获用户浏览日志信息
(二)实时分析前20名流量最高的新闻话题
(三)实时统计当前线上已曝光的新闻话题
(四)统计哪个时段用户浏览量最高
二、系统架构图设计
三、系统数据流程设计
四、集群资源规划设计
五、数据结构
六、步骤详解
6.1数据的采集
6.1.1离线采集
Flume+HBase+Hive
使用HBase?
首先数据是实时的用户查询日志信息,为了做离线的统计,Flume+HDFS:需要将数据存储HDFS,写入的压力非常大,单位时间内写入的数据量比较大,直接向HDFS中写入的效率比较低,而HBase,高速随机读写的数据库,使用HBase去接收Flume传送过来的数据,当数据被传入HBase之后,可以使用Hive去关联HBase中的数据,之后就可以做一些统计分析的工作。
flume-sinks是HBase的时候:
payloadColume: 获取到要写入到Hbase中的列,如果没有做列的配置,默认值就是pCol
incrementColume: 特有的列,不参与计算,系统保有,如果这一列缺省,同样表中只有有一列内容,而且在操作过程中很有可能报错
flume+hbase整合中默认的行键类型有四种,默认为uuid
timestamp: 精度到毫秒 10^-3
random: 随机数
nano: 纳米 10^-9
6.1.1.1关联Flume+HBase
使用sink是hbase或者asynchbase,但是这两种方式其中定义EventSerializer满足不了需求,日志数据有6列,数据以\t分割,但是默认的hbasesink只能写入到一列中,所以可以使用正则或者自定义hbase-sink。
使用正则表达式:
Flume提供了两个序列化器。
SimpleHbaseEventSerializer(org.apache.flume.sink.hbase.SimpleHbaseEventSerializer)按原样将事件主体写入HBase,并可选择增加Hbase中的列。RegexHbaseEventSerializer(org.apache.flume.sink.hbase.RegexHbaseEventSerializer)根据给定的正则表达式打破事件体,并将每个部分写入不同的列。
[mw_shl_code=text,true]#define sinks(未测试)
a1.sinks.k1.type = org.apache.flume.sink.hbase.HBaseSink
#a1.sinks.k1.type = hbase
a1.sinks.k1.table = new_log
a1.sinks.k1.columnFamily = cf
a1.sinks.k1.serializer = org.apache.flume.sink.hbase.RegexHbaseEventSerializer
# \t 匹配一个制表符。等价于 \x09 和 \cI。
a1.sinks.k1.serializer.regex = (.*?)\x09(.*?)\x09(.*?)\x09(.*?)\x09(.*?)\x09(.*?)
a1.sinks.k1.serializer.colNames = datetime,userid,searchname,retorder,cliorder,cliurl
[/mw_shl_code]
自定义flume-hbase-sink:
修改flume的源码包flume-hbase-sink model,添加一下MyAsyncHbaseEventSerializer 类和SimpleRowKeyGenerator的自定义方法
[mw_shl_code=java,true]/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.flume.sink.hbase;
import com.google.common.base.Charsets;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.FlumeException;
import org.apache.flume.conf.ComponentConfiguration;
import org.apache.flume.sink.hbase.SimpleHbaseEventSerializer.KeyType;
import org.hbase.async.AtomicIncrementRequest;
import org.hbase.async.PutRequest;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
/**
* 自定义的Flume-Hbase-Sink,主要完成的功能:
* 1. 可以指定任意的分隔符
* 2. 可以完成任意的列内容的分割
* 3. 自定义行键
*/
public class MyAsyncHbaseEventSerializer implements AsyncHbaseEventSerializer {
private byte[] table;
private byte[] cf;
private byte[] payload;
private byte[] payloadColumn;
private String separator; //自定义的分隔符
private byte[] incrementColumn;
private String rowPrefix;
private byte[] incrementRow;
private KeyType keyType;
@Override
public void initialize(byte[] table, byte[] cf) {
this.table = table;
this.cf = cf;
}
@Override
public List<PutRequest> getActions() {
List<PutRequest> actions = new ArrayList<PutRequest>();
if (payloadColumn != null) {
try {
String[] columns = new String(payloadColumn).split(",");
String[] values = new String(payload).split(separator);
System.out.println("------------>columns: "+ Arrays.toString(columns));
System.out.println("------------>values: "+ Arrays.toString(values));
byte[] rowKey = SimpleRowKeyGenerator.getMyRowKey(values[0],values[1]);
for (int i = 0; i < columns.length; i++){
byte[] column = columns.getBytes(Charsets.UTF_8);
byte[] value = values.getBytes(Charsets.UTF_8);
PutRequest putRequest = new PutRequest(table, rowKey, cf, column, value);
actions.add(putRequest);
}
} catch (Exception e) {
throw new FlumeException("Could not get row key!", e);
}
}
return actions;
}
public List<AtomicIncrementRequest> getIncrements() {
List<AtomicIncrementRequest> actions = new ArrayList<AtomicIncrementRequest>();
if (incrementColumn != null) {
AtomicIncrementRequest inc = new AtomicIncrementRequest(table,
incrementRow, cf, incrementColumn);
actions.add(inc);
}
return actions;
}
@Override
public void cleanUp() {
// TODO Auto-generated method stub
}
@Override
public void configure(Context context) {
//获取到要写入到hbase中的列,如果没有做列的配置,默认值就是pCol
String pCol = context.getString("payloadColumn", "pCol");
String sep = context.getString("separator", ",");
//特有的列,不参与计算,系统保有,如果这一列缺省,同样表中只有有一列内容,这样的话,而且在操作过程中很可能报错
String iCol = context.getString("incrementColumn", "iCol");
rowPrefix = context.getString("rowPrefix", "default");
String suffix = context.getString("suffix", "uuid");
if (pCol != null && !pCol.isEmpty()) {
//获取了所有的列,交给成员标量payloadColumn
payloadColumn = pCol.getBytes(Charsets.UTF_8);
}
if(sep != null && !sep.isEmpty()){
//指定分隔符
separator = sep;
System.out.println("-----------separator: "+separator +"--------------------------");
}
if (iCol != null && !iCol.isEmpty()) {
incrementColumn = iCol.getBytes(Charsets.UTF_8);
}
incrementRow = context.getString("incrementRow", "incRow").getBytes(Charsets.UTF_8);
}
@Override
public void setEvent(Event event) {
this.payload = event.getBody();
}
@Override
public void configure(ComponentConfiguration conf) {
// TODO Auto-generated method stub
}
}[/mw_shl_code]
SimpleRowKeyGenerator类的一个自定义方法:
[mw_shl_code=java,true]SimpleRowKeyGenerator类的一个自定义方法:
/**
* 自定义行键,必须要跟业务相关,同时在定义过程中要尽量避免出现hbase热点问题
*
* @param datetime 00:00:00
* 考虑到热点问题,如果出现时间戳,手机号等待类似连续的数据,尽量倒置
* @param userid 23908140386148713
* @return
* @throws UnsupportedEncodingException
*/
public static byte[] getMyRowKey(String datetime, String userid) throws UnsupportedEncodingException {
return (userid + "_" + datetime).getBytes("UTF8");
}
[/mw_shl_code]
编译打包:mvn clean package -DskipTests
将项目target目录下面的flume-ng-hbase-sink-1.8.0.jar 替换掉集群中的flume中对应的jar包
然后
配置flume-hbase-sink.conf:
[mw_shl_code=text,true]#########################################################
##
##主要作用是文件中的新增内容,将数据打入到HBase中
## 注意:Flume agent的运行,主要就是配置source channel sink
## 下面的a1就是agent的代号,source叫r1 channel叫c1 sink叫k1
#########################################################
a1.sources = r1 r2 r3
a1.sinks = k1
a1.channels = c1
#对于source的配置描述 监听文件中的新增数据 exec
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /home/hadoop/data/projects/news/data/news-logs.log
#对于sink的配置描述 使用hbase做数据的消费
a1.sinks.k1.type = asynchbase
a1.sinks.k1.table = new_log
a1.sinks.k1.columnFamily = cf
a1.sinks.k1.serializer = org.apache.flume.sink.hbase.MyAsyncHbaseEventSerializer
# 将flume采集到的数据,写入到hbase表中对应的列
a1.sinks.k1.serializer.payloadColumn = datetime,userid,searchname,retorder,cliorder,cliurl
# 自定义的分隔符
a1.sinks.k1.serializer.separator = \\t
#对于channel的配置描述 使用文件做数据的临时缓存 这种的安全性要高
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /home/hadoop/data/projects/news/checkpoint
a1.channels.c1.dataDirs = /home/hadoop/data/projects/news/channel
#通过channel c1将source r1和sink k1关联起来
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1[/mw_shl_code]
启动flume:
[mw_shl_code=shell,true] nohup bin/flume-ng agent -n a1 -c conf -f conf/flume-hbase-sink.conf >/dev/null 2>&1 &[/mw_shl_code]
测试: 进入到hbase shell >>scan 'new_log'
6.1.1.2 hive和habse关联
[参考hive官网中hbase集成]https://cwiki.apache.org/confluence/display/Hive/HBaseIntegration#HBaseIntegration-HiveHBaseIntegration
Here’s an example which instead targets a distributed HBase cluster where a quorum of 3 zookeepers is used to elect the HBase master:
[mw_shl_code=shell,true]
$HIVE_SRC/build/dist/bin/hive --auxpath $HIVE_SRC/build/dist/lib/hive-hbase-handler-0.9.0.jar,$HIVE_SRC/build/dist/lib/hbase-0.92.0.jar,$HIVE_SRC/build/dist/lib/zookeeper-3.3.4.jar,$HIVE_SRC/build/dist/lib/guava-r09.jar --hiveconf hbase.zookeeper.quorum=zk1.yoyodyne.com,zk2.yoyodyne.com,zk3.yoyodyne.com[/mw_shl_code]
创建一个hive的外部表
[mw_shl_code=sql,true]CREATE EXTERNAL TABLE new_log(
id string,
datetime string,
userid string,
searchname string,
retorder int,
cliorder int,
cliurl string
)
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,cf:datetime,cf:userid,cf:searchname,cf:retorder,cf:cliorder,cf:cliurl")
TBLPROPERTIES ("hbase.table.name" = "new_log", "hbase.mapred.output.outputtable" = "new_log");[/mw_shl_code]
注意:
hbase.columns.mapping 是必须要配置的。 hbase.table.name 配置是可选的,它允许使用Hive中不同表名的表去操作HBase中已知的一张表。如:hive中已知的表 hbase_table_1 可以操作HBase中的表。如果未指定,Hive和HBase的表名称需要相同。 hbase.mapred.output.outputtable 配置是可选的,如果计划向表中插入数据(使用 hbase.mapreduce.TableOutputFormat操作该属性),则需要它。
问题1:
使用select * from new_log limit 5; 可以查到数据,但是使用select count(*) from new_log;统计的数据量为0,
解决方法:进入hive shell,输入set hive.compute.query.using.stats=false,然后运行下查询语句,发现正常。
原因:hive.compute.query.using.stats 默认值: false,当set hive.compute.query.using.stats设置为true时,hive在进行一些如:min,max和count(1)的查询的时候,将使用存储在Metastore中的统计信息来回答。对于基本统计信息收集,需要将配置属性hive.stats.autogather设置为true。详细请参照:hive官网配置参数
问题2:
使用HBase和Hive查询出来的数据量是253637,而flume源news-logs.log的数据量是1724258,剩下的数据去哪了?
???
[mw_shl_code=text,true]追踪源码 追踪日志信息 通过分析 版本不匹配 时间不同步 存在脏数据
打印非常详细的日志
源码中定义一个静态变量Long统计数据条数
考虑问题的思路:
1. 日志输出有没有问题
2. 代码有没有问题
3. 服务器是否有问题
4. 各个软件之间的兼容性是否有问题
5. 编写的程序的稳定性
[/mw_shl_code]
6.1.1.3 相关的查询分析操作
1.求出热点话题Top10(对应实时的统计:实时分析前20名流量最高的新闻话题)
[mw_shl_code=sql,true]select searchname, count(1) count from new_log group by searchname distribute by searchname sort by count desc limit 10;
[/mw_shl_code]
复习:
hive的排序方式:
oder by: 全局排序,尽量少用,会将所有的数据先拉到一起来排序,相当于是生成一个reduce作业,要尽量去避免只有一个reduce作业的情况。
distribute by: 按照某个字段进行分区
sort by: 针对某一个分区的排序
distribute by 和 sort by 联合在一起可以达到order by 的效果
有一个: cluster by ,但是比较局限,只支持asc,不能在cluster by后加一个desc,而且要求分区字段和排序字段相同。
group by 只是分组,没有分区和排序的功能。
2.统计当前线上已曝光的新闻话题
[mw_shl_code=sql,true]select distinct searchname from new_log;
[/mw_shl_code]
3.离线统计各个时段的新闻浏览量(统计哪个时段用户浏览量最高)
[mw_shl_code=sql,true]select
hour(datetime) hour,
count(1) count
from new_log
group by hour(datetime)
distribute by hour sort by count desc
limit 10;
[/mw_shl_code]
6.1.1.3 将离线的统计结果导入到HBase(可以考虑建立二级索引)
为了方便后期数据可视化:
创建一张结果输出表,用于存储hive统计的结果,这张表关联hbase。
[mw_shl_code=sql,true]CREATE TABLE hour_news_count (
hour string,
count int
)
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,cf:count")
TBLPROPERTIES ("hbase.table.name" = "hour_news_count", "hbase.mapred.output.outputtable" = "hour_news_count");
[/mw_shl_code]
导入数据集:
[mw_shl_code=sql,true]insert into hour_news_count
select
hour(datetime) hour,
count(1) count
from new_log
group by hour(datetime)
distribute by hour sort by count desc
limit 10;
[/mw_shl_code]
数据导入到HBase或者Hive或者mysql或者redis之后,就可以进行数据可视化了。最后使用SSM+Echars+H5展示即可。
实时采集
6.1.2.1 Flume+Kafka
kafka-sink中设置数据确认: 0: 不等待, 1: 只要有leader即可, -1: 需要所有副本都写入成功
kafka.producer.acks 1 How many replicas must acknowledge a message before its considered successfully written. Accepted values are 0 (Never wait for acknowledgement), 1 (wait for leader only), -1 (wait for all replicas) Set this to -1 to avoid data loss in some cases of leader failure.
首先在Kafka里面创建一个topic:
[mw_shl_code=shell,true]kafka-topics.sh --create --topic news-logs --zookeeper hadoop01:2181,hadoop02:2181,hadoop03:2181 --partitions 3 --replication-factor 2
[/mw_shl_code]
指定flume-kafka-sink.conf配置文件(flume1.8不支持kafka2.1):
[mw_shl_code=text,true]#########################################################
##
##主要作用是监听文件中的新增数据,采集到数据之后,导入到kafka
## 注意:Flume agent的运行,主要就是配置source channel sink
## 下面的a1就是agent的代号,source叫r1 channel叫c1 sink叫k1
#########################################################
a1.sources = r1
a1.sinks = k1
a1.channels = c1
#对于source的配置描述 监听文件中的新增数据 exec
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /home/hadoop/data/projects/news/data/news_log_rt.log
#对于sink的配置描述 使用kafka日志做数据的消费
#a1.sinks.k1.type = logger
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.bootstrap.servers = hadoop01:9092,hadoop02:9092,hadoop03:9092
a1.sinks.k1.kafka.topic = news-logs
a1.sinks.k1.kafka.flumeBatchSize = 1000
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
#a1.sinks.k1.kafka.producer.compression.type = snappy
#对于channel的配置描述 使用文件做数据的临时缓存 这种的安全性要高
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 1000
#通过channel c1将source r1和sink k1关联起来
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
[/mw_shl_code]
6.2 数据的统计分析
6.2.1 使用的软件架构:
SparkStreaming+Kafka+web+scala+java+maven+hbase/mysql/redis
6.2.2 项目创建
小技巧:在创建maven项目时,添加一个kv键值对,key=archetypeCatalog,value=local|internet,建议使用local,默认是internet。创建的maven项目是由原型,采用默认每次都要从互联网去下载原型,网络不好时,下载效率非常慢;使用local第一次会将网络上的原型下载到本地仓库,后期直接使用本地仓库中的原型,效率高
6.2.3 实时任务的统计
1. 先获取kafka中的数据:
receiver V.S. Direct
在流计算中有三种语义需要处理:
receiver情况 | receiver问题 | at least once: 至少一次 | 一条记录被重复消费 | at most once: 最多一次 | 一条记录由于失败的原因没有被成功消费 | exactly once: 恰好一次 | 保证数据的完整 | 之前解决方法: | kafka --> zk记录offset, sparkstreming,如何解决receiver带来的问题: 开启checkpoint,开启wal | 现在的解决方法: | KafkaUtils.createDirectStream:保证数据的完整 |
分布式中遵循的原则: 移动计算不移动数据
数据的本地性: 数据和计算它的代码之间的距离
- process: 进程级别(数据和代码在同一个进程)
- node: 数据和代码在同一节点的不同进程中
- rack: 不同节点,机架
[mw_shl_code=scala,true]def createStream(ssc: StreamingContext, topics: String): DStream[String] = {
/**
* ConsumerStrategy: 主要就是用来指定kafka的相关配置信息的。
* "bootstrap.servers" -> "localhost:9092,anotherhost:9092",
* "key.deserializer" -> classOf[StringDeserializer],
* "value.deserializer" -> classOf[StringDeserializer],
* "group.id" -> "use_a_separate_group_id_for_each_stream",
* "auto.offset.reset" -> "latest",
* "enable.auto.commit" -> (false: java.lang.Boolean)
*
*/
val kafkaParams = Map(
"bootstrap.servers" -> "hadoop01:9092,hadoop02:9092,hadoop03:9092",
// "bootstrap.servers" -> "hadoop:19092,hadoop:29092,hadoop:39092",
"key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
"value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
"group.id" -> "news_logs_topgroup"
)
/**
* LocationStrategy: 主要指定的数据加载的策略
* 主要是读取数据性能问题的考虑,选择不同的数据本地性级别来加载数据。
* 核心原因在于,新版的api要预先fentch一批数据放到buffer
* PreferBrokers: 如果brokers和spark作业的executor在相同的节点的时候,可以选择这种方式。
* PreferConsistent: 最常用的一种数据加载策略,为每一个executor,只会去分发不同partition中的数据。
* PreferFixed: 固定的方式,在某些host上加载不同的partition,
* PreferFixed(hostMap:collection.Map[TopicPartition,String])
* Map[TopicPartition,String](
* (new TopicPartition("news-logs",0) -> "hadoop03"),
* (new TopicPartition("news-logs",1) -> "hadoop01"),
* (new TopicPartition("news-logs",2) -> "hadoop02")
* )
* Topic:news-logs PartitionCount:3 ReplicationFactor:2 Configs:
* Topic: news-logs Partition: 0 Leader: 3 Replicas: 3,1 Isr: 1,3
* Topic: news-logs Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2
* Topic: news-logs Partition: 2 Leader: 2 Replicas: 2,3 Isr: 2,3
*/
val kafkaStream: InputDStream[ConsumerRecord[String, String]] =
KafkaUtils.createDirectStream(
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe(topics.split(","), kafkaParams)
)
val messages: DStream[String] = kafkaStream.map(record => record.value())
messages
[/mw_shl_code]
2.实时分析前20名流量最高的新闻话题
(1)需求分析: 实际上求的每一个新闻话题的wordcount,然后排序,求最高的前20,实时分析前20名的流量最高,就要计算截止到目前为止的数据流量,不是单一批次
(2)使用算子: updataStateByKey,需要启动一个checkpoint的位置,用来存储截止到目前为止之前的数据
截止到目前的所有话题的浏览量
总的数据量 = 当前批次的数据量 + 之前的数据量
之前的数据: A —> 10 B —> 11
新来了一批数据: C —> 2 B —> 1
updateFunc: (Seq[V], Option[S]) 相当于 a left join b a表所有的都显示,b表中对应不上的显示null
(3)新闻话题就是log中的第三列: searchname
(4)前20名 —> 发现streaming无法进行排序,要么是用core,要么是用sql进行排序
DStream.foreach( rdd --> smDF:rdd.toDF --> createOrReplaceTempView -->retDF: sql --> retDF.write.mode(SaveMode.Overwrite).jdbc
3.实时统计当前线上已曝光的新闻话题
[mw_shl_code=sql,true]select distinct searchname from t; 略
[/mw_shl_code]
4.统计哪个时段用户浏览量最高统计哪个时段用户浏览量最高
步骤:
- 读取kafka的数据 --> createStream(ssc, topics)
- 统计各个小时的浏览量 --> updateStateByKey
- 使用sparkStreaming直接将结果落地到mysql数据库
[mw_shl_code=scala,true] usbDStream.foreachRDD(rdd =>{
if(! rdd.isEmpty()){
rdd.foreachPartition(partition => {
if(!partition.isEmpty){
/* val qr = new QueryRunner(DBCPUtils.getDataSource)
val sql = "INSERT OVERWRITE b_hour_topics values(?, ?)"
partition.foreach{
case (hour, count) =>{
val objs = Array(hour, count)
qr.update(sql,objs :_ *)
}
}
qr.batch(sql,params)*/
classOf[com.mysql.jdbc.Driver]
val connection = DriverManager.getConnection(DBCPUtils.url,DBCPUtils.username, DBCPUtils.password)
val sql = "INSERT INTO b_hour_topics values(?, ?)"
val ps = connection.prepareStatement(sql)
partition.foreach{
case(hour, count) => {
ps.setString(1,hour)
ps.setInt(2,count)
ps.addBatch()
}}
ps.executeBatch()
ps.close()
connection.close()
}
})
}
})
[/mw_shl_code]
但是在操作过程中发现出现大量的数据重复,数据乱码等存在一些问题,故采取换一种方法:
配置文件(dbcp-config.properties):
[mw_shl_code=text,true]#连接设置
driverClassName=com.mysql.jdbc.Driver
#在mysql5.8中需要加useSSL=false
url=jdbc:mysql://hadoop01:3306/news_db?useSSL=false
username=root
password=root
#<!-- 初始化连接 -->
initialSize=10
#最大连接数量
maxActive=50
#<!-- 最大空闲连接 -->
maxIdle=20
#<!-- 最小空闲连接 -->
minIdle=5
#<!-- 超时等待时间以毫秒为单位 6000毫秒/1000等于60秒 -->
maxWait=60000
#JDBC驱动建立连接时附带的连接属性属性的格式必须为这样:[属性名=property;]
#注意:"user" 与 "password" 两个属性会被明确地传递,因此这里不需要包含他们。
connectionProperties=useUnicode=true;characterEncoding=utf8
#指定由连接池所创建的连接的自动提交(auto-commit)状态。
defaultAutoCommit=true
[/mw_shl_code]
先获取JDBC连接:
[mw_shl_code=java,true]package rk.news.utils;
import org.apache.commons.dbcp.BasicDataSourceFactory;
import javax.sql.DataSource;
import java.io.IOException;
import java.io.InputStream;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Properties;
/**
* @Author rk
* @Date 2018/12/1 14:59
* @Description:
* dbcp和p0都是常用的数据库的连接池
**/
public class DBCPUtils {
private static DataSource ds;
private DBCPUtils(){}
public static String url;
public static String username;
public static String password;
static {
try {
Properties properties = new Properties();
String path = "dbcp-config.properties";
InputStream in = DBCPUtils.class.getClassLoader().getResourceAsStream(path);
properties.load(in);
url = properties.getProperty("url");
username = properties.getProperty("username");
password = properties.getProperty("password");
ds = BasicDataSourceFactory.createDataSource(properties);
} catch (IOException e) {
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
}
}
public static DataSource getDataSource(){
return ds;
}
public static Connection getConnection(){
try {
return ds.getConnection();
} catch (SQLException e) {
e.printStackTrace();
}
return null;
}
public static void release(Connection con, Statement st, ResultSet rs){
try {
if(rs != null) {
rs.close();
}
} catch (SQLException e) {
e.printStackTrace();
}finally {
try {
if (st != null) {
st.close();
}
} catch (SQLException e) {
e.printStackTrace();
} finally {
try {
if (con != null) {
con.close();
}
} catch (SQLException e) {
e.printStackTrace();
}
}
}
}
}
[/mw_shl_code]
封装对象HourTopics :
[mw_shl_code=java,true]package rk.news.entity;
/**
* @Author rk
* @Date 2018/12/3 17:04
* @Description:
* create table b_hour_top(
* hour varchar(10),
* times int
* );
**/
public class HourTopics {
private String hour;
private int times;
public String getHour() {
return hour;
}
public void setHour(String hour) {
this.hour = hour;
}
public int getTimes() {
return times;
}
public void setTimes(int times) {
this.times = times;
}
}
[/mw_shl_code]
统计各时间TopN接口(IHourTopicsDao ):
[mw_shl_code=java,true]public interface IHourTopicsDao {
void insert(HourTopics ht);
void insertBatch(List<HourTopics> hts);
}
[/mw_shl_code]
统计各时间实现类(DefaultHourTopicsDaoImpl):
[mw_shl_code=java,true]public class DefaultHourTopicsDaoImpl implements IHourTopicsDao {
private QueryRunner qr = new QueryRunner(DBCPUtils.getDataSource());
String insertSQL = "INSERT INTO b_hour_topics (hour, times) values(?,?)";
String updateSQL = "UPDATE b_hour_topics SET times = ? WHERE hour = ? ";
String selectSQL = "SELECT times FROM b_hour_topics WHERE hour = ?";
@Override
public void insert(HourTopics ht) {
try {
//首先查看当前小时对应的数据是否存在,如果不存在,则插入,如果存在,覆盖
Integer times = qr.query(selectSQL, new ScalarHandler<>(), ht.getHour());
if (times == null ){//数据库中没有
qr.update(insertSQL, ht.getHour(), ht.getTimes());
}else{
qr.update(updateSQL, ht.getTimes(), ht.getHour());
}
} catch (SQLException e) {
e.printStackTrace();
}
}
@Override
public void insertBatch(List<HourTopics> hts) {
//进行插入的数据
List<HourTopics> insertList = new ArrayList<>();
//更新的数据
List<HourTopics> updateList = new ArrayList<>();
try {
//一条一条的判断当前数据库中是否已经存在对应的数据
for(HourTopics ht : hts){
Integer times = qr.query(selectSQL, new ScalarHandler<>(), ht.getHour());
if (times == null ){//数据库中没有
insertList.add(ht);
}else{
updateList.add(ht);
}
}
//执行插入
if(!insertList.isEmpty()){
Object[][] insertParams = new Object[insertList.size()][];
for (int i = 0; i < insertList.size(); i++){
HourTopics ht = insertList.get(i);
Object[] obj = {ht.getHour(),ht.getTimes()};
insertParams = obj;
}
qr.batch(insertSQL,insertParams);
}
//执行更新
if(!updateList.isEmpty()){
Object[][] updateParams = new Object[updateList.size()][];
for (int i = 0; i < updateList.size(); i++){
HourTopics ht = updateList.get(i);
Object[] obj = {ht.getTimes(),ht.getHour()};
updateParams = obj;
}
qr.batch(updateSQL,updateParams);
}
} catch (SQLException e) {
e.printStackTrace();
}
}
}
[/mw_shl_code]
小技巧:TRUNCATE和DELETE的区别:
TRUNCATE和DELETE的区别 | 当加入表中有1000w条记录 | TRUNCATE TABLE ‘b_hour_topics’; | 执行 2条:第一条 drop talbe 第二条: create table | DELETE FROM ‘b_hour_topics’; | 执行1000w次 |
在实时或准实时统计中,不建议将数据落地到mysql中,高频操作,mysql服务承载压力有限,如果可以到达大概每秒10w条,可以考虑,一般建议将结果落地到hbase, redis, es, ignite等。这里我们使用redis,将话题数据插入到redis中。
但是插入到数据库是一个action操作,所以需要先把数据进行持久化。
[mw_shl_code=text,true] /**
* MEMORY_ONLY: 效率最快,最消耗资源
* 数据是没有经过序列化保存在内存中,容易造成OOM,gc的频率和object的个数成正比 不建议使用(100条以内可以考虑)
* MEMORY_ONLY_SER: 针对上述的优化,数据经过序列化存储,一个partition的数据就只有一个object(默认3M以上进入老年代)
* 这里的性能消耗在于:序列化和反序列化 kryo
* MERORY_AND_DISK (不建议)
* MEMORY_AND_DISK_SER(建议)
* DISK_ONLY(不用)
* MEMORY_ONLY_2,MEMORY_AND_DISK_2(不建议,除非对数据的容错性要求非常高)
* OFF_HEAP(experimental) 堆外内存: 以上数据都要占用executor的内存
* executor中的 spark.storge.memoeyFraction: 0.6(持久化的数据,占到了executor内存的60%)
* spark.shuffle.memoeyFraction: 0.2(shuffle,占到了executor内存的20%)
*
* alluxio(tachyon --> 基于内存的hdfs版本)
*/
batchTopics2Count.persist(StorageLevel.MEMORY_ONLY)
[/mw_shl_code]
[mw_shl_code=text,true] /**
* 将话题数据插入到redis中
*/
batchTopics2Count.foreachRDD(rdd => {
if(! rdd.isEmpty()){
rdd.foreachPartition(partition =>{
if(! partition.isEmpty){
val jedis = JedisUtil.getJedis
partition.foreach{case (topic, topicNum) =>{
jedis.sadd(Constants.NEW_TOPICS, topic)
}}
JedisUtil.close(jedis)
}
})
}
})
[/mw_shl_code]
之后再将数据写入到redis
首先编写一个配置文件:
[mw_shl_code=text,true]########################################
##
##这个是redis的配置文件
##
########################################
#host or ip
host=hadoop01
#port redis connect port
port=6379
##最大空闲连接树
maxIdle=10
##最大连接数
maxTotal=100
##创建连接超时时间
maxWaitMillis=2000
##获取连接测试是否可用
testOnBorrow=true
##超时时间
timeout=20000
##认证面
#password=hadoop
[/mw_shl_code]
然后通过配置文件获取一个Jedis连接:
[mw_shl_code=java,true]package rk.news.utils;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
import rk.news.conf.RedisConfig;
import java.io.IOException;
import java.util.Properties;
/**
* @Author rk
* @Date 2018/12/3 19:02
* @Description:
**/
public class JedisUtil {
private static JedisPool jedisPool;
private JedisUtil(){}
private static Properties prop;
static {
prop = new Properties();
try {
prop.load(JedisUtil.class.getClassLoader().getResourceAsStream("redis.conf"));
} catch (IOException e) {
e.printStackTrace();
}
}
public static Jedis getJedis(){
if(jedisPool == null){
synchronized (JedisUtil.class){
JedisPoolConfig poolConfig = new JedisPoolConfig();
poolConfig.setMaxIdle(Integer.valueOf(prop.getProperty(RedisConfig.MAX_IDLE)));
poolConfig.setMaxTotal(Integer.valueOf(prop.getProperty(RedisConfig.MAX_TOTAL)));
poolConfig.setMaxWaitMillis(Integer.valueOf(prop.getProperty(RedisConfig.MAX_WAIT_MILLIS)));
jedisPool = new JedisPool(poolConfig,
prop.getProperty(RedisConfig.HOST),
Integer.valueOf(prop.getProperty(RedisConfig.PORT)),
Integer.valueOf(prop.getProperty(RedisConfig.TIME_OUT)));
}
}
return jedisPool.getResource();
}
public static void close(Jedis jedis){
if (jedis != null){
jedis.close();
}
}
}
[/mw_shl_code]
获取一个redis的key:
[mw_shl_code=java,true]package rk.news.conf;
/**
* @Author rk
* @Date 2018/12/3 19:09
* @Description:
**/
public class Constants {
public static String NEW_TOPICS = "news.topics";
}
[/mw_shl_code]
写入到redis中:
[mw_shl_code=java,true] /**
* 将话题数据插入到redis中
* 实时或准实时统计中,不建议将数据落地到mysql中,高频操作,mysql服务承载压力有限,如果可以到达大概每秒10w条,可以考虑
* 所以建议,可以将结果落地到hbase, redis, es, ignite等
*
*/
batchTopics2Count.foreachRDD(rdd => {
if(! rdd.isEmpty()){
rdd.foreachPartition(partition =>{
if(! partition.isEmpty){
val jedis = JedisUtil.getJedis
partition.foreach{case (topic, topicNum) =>{
jedis.sadd(Constants.NEW_TOPICS, topic)
}}
JedisUtil.close(jedis)
}
})
}
})
[/mw_shl_code]
后面获取到redis中的数据:
[mw_shl_code=java,true] /**
* redis基本数据:
* string: len set get
* list: lpush
* hash: hget
* set: sadd smembers scard: 获取长度
* zset: z
*/
@Override
public long getAllTopics() {
Jedis jedis = JedisUtil.getJedis();
Long totalTopics = jedis.scard(Constants.NEW_TOPICS);
JedisUtil.close(jedis);
return totalTopics;
}
[/mw_shl_code]
代码下载:
NewsProject.zip
(290.1 KB, 下载次数: 1, 售价: 1 云币)
|
|