问题导读
1.本文使用的什么技术?
2.本文实现了哪些功能?
3.代码实现都使用了Spark哪些技术?
1.项目分析
项目地址:https://gitee.com/jenrey/adv
技术分析:
SparkStreaming或者Strom
数据:
广告流量点击数据
需求分析:
1)【 实时】统计【每天】【各省】【热门】广告(分组求广告点击次数多的TopN)
2)实时统计某个阶段广告投放趋势
数据调研:
timestamp:时间戳,用户点击广告的时间
province:省份,用户在哪个省份点击的广告
city:城市,用户在哪个城市点击的广告
userid:用户的唯一标识
advid:被点击的广告id
现在有数据源在kafka里面
2.黑名单过滤
[mw_shl_code=scala,true]import kafka.serializer.StringDecoder
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
/**
* Create by jenrey on 2018/5/27 21:07
*/
object AdvApplicationTest {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setAppName("AdvApplicationTest")
conf.setMaster("local")
conf.set("", "") //序列化
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(5))
/**
* TODO:第一步:从kafka获取数据(direct 方式)
*/
/* K: ClassTag,
V: ClassTag,
KD <: Decoder[K]: ClassTag,
VD <: Decoder[V]: ClassTag] (
ssc: StreamingContext,
kafkaParams: Map[String, String],
topics: Set[String]*/
val kafkaParams = Map("metadata.broker.list" -> "hadoop04:9092")
val topics = Set("aura")
val logDStream: DStream[String] = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics).map(_._2)
//TODO:如果【一个用户】【一天内】对【某个广告】点击的次数超过了【100次】,这样的用户属于黑名单用户,这样的数据就不统计了
/**
* TODO:第二步:进行黑名单过滤
*/
val filterLogDStream: DStream[String] = blackListFileter(logDStream,ssc)
/**
* TODO:第三步:动态生成黑名单
*/
/**
* TODO:第四步:实时统计每天各省各城市广告点击量
*/
/**
* TODO:第五步:实时统计每天各省热门广告点击量
*/
/**
* TODO:第六步:实时统计每天每个广告在最近一小时的滑动窗口的点击趋势
*/
ssc.start()
ssc.awaitTermination()
ssc.stop()
}
/**
* 对黑名单进行过滤的方法
*
* @param logDStream 从kafka读取数据
* @return 进行黑名单过滤以后的数据
*/
def blackListFileter(logDStream: DStream[String], ssc: StreamingContext): DStream[String] = {
//这个地方的黑名单,应该是从我们持久化的数据库里面读取的:有三个数据库是我们常用的(Redis,hbase,mysql)
val blackList = List((1L, true), (2L, true), (3L, true))
//把黑名单转化成RDD
val blackListRDD: RDD[(Long, Boolean)] = ssc.sparkContext.parallelize(blackList)
//广播黑名单
val blackListBroadcast: Broadcast[Array[(Long, Boolean)]] = ssc.sparkContext.broadcast(blackListRDD.collect())
//transform对传进来的DStream中的每一个RDD进行操作
logDStream.transform(rdd => {
//把传进来的数据切分,组成kv形式
val user_lineRDD: RDD[(Long, String)] = rdd.map(line => {
val fields: Array[String] = line.split(",")
(fields(3).toLong, line)
})
//注意广播出去后,需要使用.value来获取播放值
val blackRDD: RDD[(Long, Boolean)] = rdd.sparkContext.parallelize(blackListBroadcast.value)
/**
* List((22L, "qwe"), (2L, "asd"), (3L, "zxc"))
* List((1L, true), (2L, true), (3L, true))
* leftOuterJoin 后的结果如下,此算子必须都是kv形式才行
* (22,(qwe,None))
* (3,(zxc,Some(true)))
* (2,(asd,Some(true)))
*/
val resultRDD: RDD[(Long, (String, Option[Boolean]))] = user_lineRDD.leftOuterJoin(blackRDD)
//这个是返回值,返回进行黑名单过滤以后的数据
resultRDD.filter(tuple=>{
tuple._2._2.isEmpty
}).map(_._2._1)
})
}
}[/mw_shl_code]
3.动态生成黑名单
[mw_shl_code=scala,true]import java.util.{Date, Properties}
import kafka.serializer.StringDecoder
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
import utils.{ConnectionPool, DateUtils}
/**
* Create by jenrey on 2018/5/27 21:07
*/
object AdvApplicationTest {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setAppName("AdvApplicationTest")
conf.setMaster("local")
conf.set("", "") //序列化
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(5))
val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
/**
* TODO:第一步:从kafka获取数据(direct 方式)
*/
/* K: ClassTag,
V: ClassTag,
KD <: Decoder[K]: ClassTag,
VD <: Decoder[V]: ClassTag] (
ssc: StreamingContext,
kafkaParams: Map[String, String],
topics: Set[String]*/
val kafkaParams = Map("metadata.broker.list" -> "hadoop04:9092")
val topics = Set("aura")
val logDStream: DStream[String] = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics).map(_._2)
//TODO:如果【一个用户】【一天内】对【某个广告】点击的次数超过了【100次】,这样的用户属于黑名单用户,这样的数据就不统计了
/**
* TODO:第二步:进行黑名单过滤
*/
val filterLogDStream: DStream[String] = blackListFileter(logDStream, ssc)
/**
* TODO:第三步:动态生成黑名单 实时生成黑名单
*/
DynamicGenerationBlacklists(filterLogDStream,spark)
/**
* TODO:第四步:实时统计每天各省各城市广告点击量
*/
/**
* TODO:第五步:实时统计每天各省热门广告点击量
*/
/**
* TODO:第六步:实时统计每天每个广告在最近一小时的滑动窗口的点击趋势
*/
ssc.start()
ssc.awaitTermination()
ssc.stop()
}
/**
* TODO:对黑名单进行过滤的方法
*
* @param logDStream 从kafka读取数据
* @return 进行黑名单过滤以后的数据
*/
def blackListFileter(logDStream: DStream[String], ssc: StreamingContext): DStream[String] = {
//这个地方的黑名单,应该是从我们持久化的数据库里面读取的:有三个数据库是我们常用的(Redis,hbase,mysql)
val blackList = List((1L, true), (2L, true), (3L, true))
//把黑名单转化成RDD
val blackListRDD: RDD[(Long, Boolean)] = ssc.sparkContext.parallelize(blackList)
//广播黑名单
val blackListBroadcast: Broadcast[Array[(Long, Boolean)]] = ssc.sparkContext.broadcast(blackListRDD.collect())
//transform对传进来的DStream中的每一个RDD进行操作
logDStream.transform(rdd => {
//把传进来的数据切分,组成kv形式
val user_lineRDD: RDD[(Long, String)] = rdd.map(line => {
val fields: Array[String] = line.split(",")
(fields(3).toLong, line)
})
//注意广播出去后,需要使用.value来获取播放值
val blackRDD: RDD[(Long, Boolean)] = rdd.sparkContext.parallelize(blackListBroadcast.value)
/**
* List((22L, "qwe"), (2L, "asd"), (3L, "zxc"))
* List((1L, true), (2L, true), (3L, true))
* leftOuterJoin 后的结果如下,此算子必须都是kv形式才行
* (22,(qwe,None))
* (3,(zxc,Some(true)))
* (2,(asd,Some(true)))
*/
val resultRDD: RDD[(Long, (String, Option[Boolean]))] = user_lineRDD.leftOuterJoin(blackRDD)
//这个是返回值,返回进行黑名单过滤以后的数据
resultRDD.filter(tuple => {
tuple._2._2.isEmpty
}).map(_._2._1)
})
}
/**
* TODO:动态生成黑名单
*
* @param filterLogDStream 黑名单过滤完了以后的数据
* 如果【一个用户】【一天内】对【某个广告】点击的次数超过了【100次】,这样的用户属于黑名单用户
* 有三种方式:1)使用UpdateStateByKey 2)reduceByKey 存入HBase 3)Mysql的方式
*/
def DynamicGenerationBlacklists(filterLogDStream: DStream[String], spark: SparkSession): Unit = {
val date_userid_advid_ds: DStream[(String, Long)] = filterLogDStream.map(line => {
val fields: Array[String] = line.split(",")
val time = new Date(fields(0).toLong)
val date: String = DateUtils.formatDateKey(time)
val userid: String = fields(3)
val advid: String = fields(4)
(date + "_" + userid + "_" + advid, 1L)
}).reduceByKey(_ + _)
date_userid_advid_ds.foreachRDD(rdd => {
rdd.foreachPartition(partition => {
//下面是写好的工具类,连接Mysql
val connection = ConnectionPool.getConnection()
val statement = connection.createStatement()
partition.foreach {
case (date_userid_advid, count) => {
val fields = date_userid_advid.split("_")
val date = fields(0)
val userid = fields(1).toLong
val advid = fields(2).toLong
val sql = s"insert into aura.tmp_advclick_count values($date,$userid,$advid,$count)";
statement.execute(sql);
}
}
ConnectionPool.returnConnection(connection)
})
})
/**
* 生成黑名单
*/
val df: DataFrame = spark.read.format("jdbc")
.option("url", "jdbc:mysql://localhost:3306/aura")
.option("user", "aura")
.option("password", "aura")
.option("dbtable", "tmp_advclick_count")
.load()
df.createOrReplaceTempView("tmp_advclick_count")
val sql =
"""
select
userid
from
(
select
date,userid,advid,sum(click_count) c_count
from
tmp_advclick_count
group by date,userid,advid
) t
where
t.c_count>100
"""
val blacklistdf= spark.sql(sql).distinct()
val properties = new Properties()
properties.put("user","aura")
properties.put("password","aura")
blacklistdf.write.mode(SaveMode.Append)
.jdbc("jdbc:mysql://localhost:3306/aura","black_list",properties)
}
}[/mw_shl_code]
4.实时统计每天各省各城市广告点击量
在上面代码后继续写下面代码就行了。
[mw_shl_code=scala,true]/**
* 实时统计每天各省各城市广告点击量
*
* @param filterLogDStream
*/
def ProvinceCityAdvClick_Count(filterLogDStream: DStream[String]): DStream[(String, Long)] = {
var f = (input: Seq[Long], state: Option[Long]) => {
val current_count = input.sum
val last_count = state.getOrElse(0)
Some(current_count + last_count)
}
filterLogDStream.map(line => {
val fields = line.split(",")
val time = fields(0).toLong
val mydate = new Date(time)
val date = DateUtils.formatDateKey(mydate)
val province = fields(1)
val city = fields(2)
val advid = fields(4)
(date + "_" + province + "_" + city + "_" + advid, 1L)
}).updateStateByKey(f)
/**
* 如果开发有需求的话,可以把这些数据库写入 MySQL数据库 ,Hbase
*/[/mw_shl_code]
5.实时统计各省热门广告
[mw_shl_code=scala,true] /**
* 实时统计 各省热门广告
*
* transform : rdd -> datafram -> table -> sql
*
* @param date_province_city_advid_count
*/
def ProvinceAdvClick_Count(date_province_city_advid_count: DStream[(String, Long)], spark: SparkSession): Unit = {
date_province_city_advid_count.transform(rdd => {
var date_province_advid_count = rdd.map {
case (date_province_city_advid, count) => {
val fields = date_province_city_advid.split("_")
val date = fields(0)
val province = fields(1)
val advid = fields(3)
(date + "_" + province + "_" + advid, count)
}
}.reduceByKey(_ + _)
val rowRDD = date_province_advid_count.map(tuple => {
val fields = tuple._1.split("_")
val date = fields(0)
val provnice = fields(1)
val advid = fields(2).toLong
val count = tuple._2
Row(date, provnice, advid, count)
})
val schema = StructType(
StructField("date", StringType, true) ::
StructField("province", StringType, true) ::
StructField("advid", LongType, true) ::
StructField("count", LongType, true) :: Nil
)
val df = spark.createDataFrame(rowRDD, schema)
df.createOrReplaceTempView("temp_date_province_adv_count")
val sql =
"""
select
*
from
(
select
date,province,advid,count,row_number() over(partition by province ordr by count desc) rank
from
temp_date_province_adv_count
) temp
where temp.rank < 10
"""
/**
* 把结果持久化到数据库
*/
spark.sql(sql)
rdd
})
}[/mw_shl_code]
6.总的代码
[mw_shl_code=scala,true]package sparkstreaming.lesson09
import java.sql.Date
import java.util.Properties
import kafka.serializer.StringDecoder
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{LongType, StringType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Row, SaveMode, SparkSession}
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
import sparkstreaming.demo.lesson01.ConnectionPool
import sparkstreaming.demo.utils.DateUtils
/**
* Created by Administrator on 2018/5/12.
*
* timestamp:
* 时间戳,用户点击广告的时间
* province:
* 省份,用户在哪个省份点击的广告
* city:
* 城市,用户在哪个城市点击的广告
* userid:
* 用户的唯一标识
* advid:
* 被点击的广告id
*/
object AdvApplicationTest {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setMaster("local")
conf.setAppName("AdvApplicationTest")
conf.set("","") //序列化
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc,Seconds(5))
val spark = SparkSession.builder()
.config(conf).getOrCreate()
/**
* 第一步:从kafka获取数据(direct 方式)
* K: ClassTag,
V: ClassTag,
KD <: Decoder[K]: ClassTag,
VD <: Decoder[V]: ClassTag] (
ssc: StreamingContext,
kafkaParams: Map[String, String],
topics: Set[String]
*/
val kafkaParams = Map("metadata.broker.list" -> "hadoop1:9092")
val topics = Set("aura")
val logDstream: DStream[String] = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, topics).map(_._2)
/**
* 第二步:进行黑名单过滤
*/
val filterLogDStream: DStream[String] = blackListFilter(logDstream,ssc)
/**
* 【一个用户】【一天内】对【某个广告】点击的次数超过了【100次】,这样的用户属于黑名单用户
*
*
* zhangsan:
* A:50 B:60
* lisi:
* A:50 A:20 A:40 这就是黑名单用户
* 如果一个用户今天是黑名单用户,那么明天还是黑名单用户吗?
* 这个看业务而定。
*
* 第三步:动态生成黑名单 实时生成黑名单
*/
DynamicGenerationBlacklists(filterLogDStream,spark)
/**
* 第四步:
* 实时统计每天各省各城市广告点击量
*/
val dateProvinceCityAdvClick_Count = ProvinceCityAdvClick_Count(filterLogDStream)
/**
* 第五步:
* 实时统计每天各省热门广告
* 分组求TopN
*
* transform froeachRDD
* rdd => dataframe
* SparkSQL:
* SQL
*/
/**
* 第六步:
* 实时统计每天每个广告在最近一小时的滑动窗口的点击趋势
*/
ssc.start()
ssc.awaitTermination()
ssc.stop()
}
/**
* 对黑名单数据进行过滤
* @param logDstream 从kafka读取数据
* @return 进行黑名单过滤以后的数据
*/
def blackListFilter(logDstream: DStream[String],ssc:StreamingContext):DStream[String]={
/**
* 这个地方应该是去数据库里面去读取数据
* black_list
*/
val blackList = List((1L,true),(2L,true),(3L,true))
val blackListRDD = ssc.sparkContext.parallelize(blackList)
val balckListBroadcast = ssc.sparkContext.broadcast(blackListRDD.collect())
/**
* 这个地方的黑名单,应该是从我们的持久化的数据库里面读取的:有三个数据库是我们常用的:
* 1)Reids 自己去百度一下
* 2) HBase 自己去百度一下
* 3) Mysql 上课演示过
* SparkCore的方式读取的
* SparkSQL -> dataframe -> rdd
*/
logDstream.transform( rdd =>{
val user_lineRDD=rdd.map( line =>{
val fields = line.split(",")
(fields(3).toLong,line)
})
val blackRDD = rdd.sparkContext.parallelize(balckListBroadcast.value)
val resultRDD: RDD[(Long, (String, Option[Boolean]))] = user_lineRDD.leftOuterJoin(blackRDD)
resultRDD.filter( tuple =>{
tuple._2._2.isEmpty
}).map(_._2._1)
})
}
/**
* 动然生成黑名单
* @param filterLogDStream 黑名单过滤万了以后的数据
* 【一个用户】【一天内】对【某个广告】点击的次数超过了【100次】,这样的用户属于黑名单用户
*
* 梳理一下思路:
* 这个需求 跟 我们单词计数很像,无非不就是实时统计每个单词出现了多少次
* 如果发现某个单词出现了一个100,那么他就是黑名单单词
* 方式一:
* (date_userid_advid,v)=map
* 实时统计出来每个单词出现了多少次=updateStateBykey (对内存的要求高一点)
* 张三 A 80
* 李四 B 99
* 100
* fitler 过滤出来次数 一百以上 把它写入 MySQL,Reids,HBase 数据库
* 方式二:
* (date_userid_advid,v)=map
* 每次处理的是本批次的数据 reduceBykey(对内存的要求低一点)
* HBase:
* rowkey: date_userid_advid 2
* 本批次 3
* 5
* Redis
* 方式三:
* MySQL的方式
*
*
*
*
*/
def DynamicGenerationBlacklists(filterLogDStream: DStream[String],spark:SparkSession):Unit={
val date_userid_advid_ds=filterLogDStream.map( line =>{
val fields = line.split(",")
val time = new Date( fields(0).toLong)
val date = DateUtils.formatDateKey(time)
val userid = fields(3)
val advid = fields(4)
//20180512_
(date+"_"+userid+"_"+advid,1L)
}).reduceByKey(_+_)
date_userid_advid_ds.foreachRDD( rdd =>{
rdd.foreachPartition( partition =>{
val connection = ConnectionPool.getConnection()
val statement = connection.createStatement()
partition.foreach{
case(date_userid_advid,count) =>{
val fields = date_userid_advid.split("_")
val date = fields(0)
val userid = fields(1).toLong
val advid = fields(2).toLong
val sql=s"insert into aura.tmp_advclick_count values($date,$userid,$advid,$count)";
statement.execute(sql);
}
}
ConnectionPool.returnConnection(connection)
})
})
/**
*生成黑名单
*/
val df: DataFrame = spark.read.format("jdbc")
.option("url", "jdbc:mysql://localhost:3306/aura")
.option("user", "aura")
.option("password", "aura")
.option("dbtable", "tmp_advclick_count")
.load()
df.createOrReplaceTempView("tmp_advclick_count")
val sql=
"""
SELECT
userid
FROM
(
SELECT
date,userid,advid,sum(click_count) c_count
FROM
tmp_advclick_count
GROUP BY
date,userid,advid
) t
WHERE
t.c_count > 100
"""
//统计出来黑名单
val blacklistdf = spark.sql(sql).distinct()
val properties = new Properties()
properties.put("user","aura")
properties.put("password","aura")
blacklistdf.write.mode(SaveMode.Append)
.jdbc("jdbc:mysql://localhost:3306/aura","black_list",properties)
}
/**
* 实时统计每天各省各城市广告点击量
* @param filterLogDStream
*/
def ProvinceCityAdvClick_Count(filterLogDStream: DStream[String]):DStream[(String,Long)]={
/**
* 思路
* map => (k,v) => date+province+city+advid 1
* updateStateBykey
*/
var f=(input:Seq[Long],state:Option[Long]) =>{
val current_count = input.sum
val last_count = state.getOrElse(0)
Some(current_count+last_count)
}
filterLogDStream.map( line =>{
val fields = line.split(",")
val time = fields(0).toLong
val mydate = new Date(time)
val date = DateUtils.formatDateKey(mydate)
val province = fields(1)
val city = fields(2)
val advid = fields(4)
(date+"_"+province+"_"+city+"_"+advid,1L)
}).updateStateByKey(f)
/**
* 如果开发有需求的话,可以把这些数据库写入 MySQL数据库 ,Hbase
*/
}
/**
* 实时统计 各省热门广告
*
* transform : rdd -> datafram -> table -> sql
* @param date_province_city_advid_count
*/
def ProvinceAdvClick_Count(date_province_city_advid_count:DStream[(String,Long)],spark:SparkSession): Unit ={
date_province_city_advid_count.transform( rdd =>{
var date_province_advid_count= rdd.map{
case(date_province_city_advid,count) =>{
val fields = date_province_city_advid.split("_")
val date = fields(0)
val province = fields(1)
val advid = fields(3)
(date+"_"+province+"_"+advid,count)
}
}.reduceByKey(_+_)
val rowRDD=date_province_advid_count.map( tuple =>{
val fields = tuple._1.split("_")
val date = fields(0)
val provnice = fields(1)
val advid = fields(2).toLong
val count = tuple._2
Row(date,provnice,advid,count)
})
val schema=StructType(
StructField("date",StringType,true)::
StructField("province",StringType,true)::
StructField("advid",LongType,true)::
StructField("count",LongType,true):: Nil
)
val df = spark.createDataFrame(rowRDD,schema)
df.createOrReplaceTempView("temp_date_province_adv_count")
val sql=
"""
select
*
from
(
select
date,province,advid,count,row_number() over(partition by province ordr by count desc) rank
from
temp_date_province_adv_count
) temp
where temp.rank < 10
"""
/**
* 把结果持久化到数据库
*/
spark.sql(sql)
rdd
})
}
}[/mw_shl_code]最新经典文章,欢迎关注公众号
原文链接:
https://blog.csdn.net/JENREY/article/details/80472383
|
|