本帖最后由 PeersLee 于 2017-3-20 17:56 编辑
问题导读:
1. Spark 都有哪些常用的Rdd API?
2. 如何实现“地区论坛访问比率计算”Demo?
解决方案:
Spark RDD 常用API解析
map
-
- val rdd = sc.parallelize(1 to 10)
- /*
- map:
- 1. 使用函数f 处理rdd 中的所有元素,产生一个新的mapRdd
- 2. 不会改变partition 数量
- */
- val mapRdd = rdd.map(_*2)
- print (mapRdd.collect().toBuffer)
- //ArrayBuffer(2, 4, 6, 8, 10, 12, 14, 16, 18, 20)
复制代码
reduce
reduce将RDD中元素两两传递给输入函数,同时产生一个新的值,新产生的值与RDD中下一个元素再被传递给输入函数直到最后只有一个值为止。
-
- scala> val c = sc.parallelize(1 to 10)
- scala> c.reduce((x, y) => x + y)
- res4: Int = 55
复制代码
flatMap
-
- val rdd = sc.textFile("file:/home/peerslee/data.txt")
- // 进行元素(行)遍历,但是会返回多个元素(根据空格分割单词)
- val flatMapRdd = rdd.flatMap(line => line.split("\\s"))
- println(flatMapRdd.collect().toBuffer)
- //ArrayBuffer(hello, spark, rdd, fire, fight)
复制代码
filter
-
- val rdd = sc.parallelize(1 to 10)
- /*
- 函数f 返回值为boolean值,将满足要求的元素合成一个新的rdd.
- */
- val filterRdd = rdd.filter(_%2 == 0)
- println(filterRdd.collect().toBuffer)
- //ArrayBuffer(2, 4, 6, 8, 10)
复制代码
mapPartitions
- val rdd = sc.parallelize(1 to 10)
- /*
- 1. 同map
- 2. 传入的是一个分区的所有iterator集合
- */
- val mapPartitionsRdd = rdd.mapPartitions(iter => iter.filter(_%2 == 1))
- println(mapPartitionsRdd.collect().toBuffer)
- //ArrayBuffer(1, 3, 5, 7, 9)
复制代码
glom
-
- val rdd = sc.parallelize(1 to 10)
- // 将每个分区转化为数组
- val glomRdd = rdd.glom()
- glomRdd.foreach(arr => {
- arr.foreach(print)
- println()
- })
复制代码
17/03/16 15:35:57 INFO Executor: Running task 2.0 in stage 0.0 (TID 2)
16872
9
10
345
17/03/16 15:35:57 INFO Executor: Finished task 3.0 in stage 0.0 (TID 3). 915 bytes result sent to driver
distinct
- val rdd = sc.parallelize(Array(1,1,1,2,3,3,4,4,4,4,5))
- /*
- 1. 去重
- 2. 改变partition数量
- */
- val distinctRdd = rdd.distinct(2)
- println(distinctRdd.collect().toBuffer)
- //ArrayBuffer(4, 2, 1, 3, 5)
复制代码
cartesian
- val rdd1 = sc.parallelize(Array('a', 'b', 'c'))
- val rdd2 = sc.parallelize(Array('A', 'B', 'C'))
- /*
- 求两个Rdd 之间的笛卡尔积,并返回
- */
- val cartesianRdd = rdd1.cartesian(rdd2)
- println(cartesianRdd.collect().toBuffer)
- //ArrayBuffer((a,A), (a,B), (a,C), (b,A), (b,B), (b,C), (c,A), (c,B), (c,C))
复制代码
union
- val rdd1 = sc.parallelize(Array('a', 'b', 'c'))
- val rdd2 = sc.parallelize(Array('A', 'B', 'C'))
- /*
- 将两个rdd 合并成一个rdd
- */
- val unionRdd = rdd1.union(rdd2)
- println(unionRdd.collect().toBuffer)
- //ArrayBuffer(a, b, c, A, B, C)
复制代码
mapValues
- val rdd = sc.parallelize(Array(("A", 1), ("B", 2), ("C", 3)))
- /*
- 对kv形式的rdd中的value进行操作,返回一个Rdd
- */
- val mapValuesRdd = rdd.mapValues(num => num *2)
- println(mapValuesRdd.collect().toBuffer)
- //ArrayBuffer((A,2), (B,4), (C,6))
复制代码
subtract
-
- val rdd1 = sc.parallelize(Array("A", "B", "C", "a", "b", "c"))
- val rdd2 = sc.parallelize(Array("A", "B", "C", "D"))
- /*
- 找到rdd1 中有,rdd2 没有的元素
- */
- val subtractRdd1 = rdd1.subtract(rdd2)
- println(subtractRdd1.collect().toBuffer)
- // ArrayBuffer(a, b, c)
- val subtractRdd2 = rdd2.subtract(rdd1)
- println(subtractRdd2.collect().toBuffer)
- // ArrayBuffer(D)
复制代码
sample
-
- val rdd = sc.parallelize(Array("A", "B", "C", "a", "b"))
- /*
- 随机抽取元素,
- 百分比,随机种子
- 返回rdd
- */
- val sampleRdd = rdd.sample(true, 0.6, 2)
- println(sampleRdd.collect().toBuffer)
- // ArrayBuffer(B, C, b)
复制代码
takeSample
-
- val rdd = sc.parallelize(Array("A", "B", "C", "a", "b"))
- /*
- 同上,
- 可以指定个数,
- 返回array
- */
- val takeSampleRdd = rdd.takeSample(true, 2, 1)
- println(takeSampleRdd.toBuffer)
- // ArrayBuffer(a, b)
复制代码
groupBy
- val rdd = sc.parallelize(Array("A1", "B1", "C1", "A2", "B2", "A3"))
- /*
- 根据,f产生的key进行分组
- */
- val groupByRdd = rdd.groupBy(_.substring(0, 1))
- println(groupByRdd.collect().toBuffer)
- //ArrayBuffer((A,CompactBuffer(A1, A2, A3)), (B,CompactBuffer(B1, B2)), (C,CompactBuffer(C1)))
复制代码
partitionBy
适用key-value对
对RDD重新分区
如果相同,返回本身
cogroup
- val rdd1 = sc.parallelize(Array(('a', 1), ('a', 2), ('b', 1), ('b',2), ('c', 1)))
- val rdd2 = sc.parallelize(Array(('a', 11), ('a', 22), ('b', 11), ('b',22), ('c', 11)))
- /*
- 1. 对于kv
- 2. 将一个两个Rdd 中相同key,整合为新的kv,返回新的rdd
- */
- val cogroupRdd = rdd1.cogroup(rdd2)
- println(cogroupRdd.collect().toBuffer)
- // ArrayBuffer((a,(CompactBuffer(1, 2),CompactBuffer(11, 22))), (b,(CompactBuffer(1, 2),CompactBuffer(11, 22))), (c,(CompactBuffer(1),CompactBuffer(11))))
复制代码
combineByKey
-
- val rdd = sc.parallelize(Array(('a', 1), ('a', 2), ('b', 3), ('b',4), ('c', 5)))
- /*
- 将每个分区的元素按照,key,合并
- */
- val combineByKeyRdd = rdd.combineByKey((v : Int) => List(v),
- (c : List[Int], v : Int) => v::c,
- (c1 : List[Int], c2 : List[Int]) => c1:::c2)
- println(combineByKeyRdd.collect().toBuffer)
- // ArrayBuffer((a,List(1, 2)), (b,List(3, 4)), (c,List(5)))
复制代码
reduceByKey
- // 相同key,用f处理value
- val rdd = sc.parallelize(Array(('a', 1), ('a', 1), ('a', 1), ('b', 1), ('b',1), ('c', 1)))
- val reduceByKeyRdd = rdd.reduceByKey((v1 : Int, v2 : Int) => v1+v2)
- println(reduceByKeyRdd.collect().toBuffer)
- // ArrayBuffer((a,3), (b,2), (c,1))
复制代码
join
- val rdd1 = sc.parallelize(Array(('a', 1), ('a', 2), ('b', 1), ('b',2), ('c', 1)))
- val rdd2 = sc.parallelize(Array(('a', 11), ('a', 22), ('b', 11), ('b',22), ('c', 11)))
- //相同key,value做笛卡尔积
- val joinRdd = rdd1.join(rdd2)
- println(joinRdd.collect().toBuffer)
- //ArrayBuffer((a,(1,11)), (a,(1,22)), (a,(2,11)), (a,(2,22)), (b,(1,11)), (b,(1,22)), (b,(2,11)), (b,(2,22)), (c,(1,11)))
复制代码
地区比率计算
思路:
- 将log文件读到spark RDD 中
- 将ip规则表(ip.txt)也读到Spark Rdd 中,处理成为我们想要的形式(start_num, end_num, province) 形成一个新的Rdd,将这个规则做一次广播
- 正则匹配出log中的访问者的ip,形成一个访问者ip的Rdd
- 对访问着ip的rdd进行处理,根据ip规则找出所属于的城市,然后设置城市为key,value=1,把key相同的元素的value相加
- 求出总共有多少个元素,再直接求比率
实现:
- import org.apache.spark.{SparkConf, SparkContext}
- import scala.util.matching.Regex
- /**
- * Created by peerslee on 17-3-10.
- */
- object IpLocationByTime {
- // 将ip地址转换为整数
- def ip2num(ip : String) : Long = {
- val fragments = ip.split("\\.")
- var ipNum = 0L
- for (i <- 0 until fragments.length) {
- // 与运算
- ipNum = fragments(i).toLong | ipNum << 8L
- }
- ipNum
- }
- // 折半查找
- def binarySearch(lines:Array[(Long,Long,String)],ip:Long): Int ={
- var low =0
- var high = lines.length-1
- while (low<=high){
- val middle = (low + high)/2
- if((ip>=lines(middle)._1)&&(ip<=lines(middle)._2)){
- return middle
- }
- if(ip<lines(middle)._1){
- high=middle-1
- }else{
- low = middle +1
- }
- }
- -1
- }
- def main (args : Array[String]): Unit = {
- val conf = new SparkConf().setAppName("IpLocationByTime").setMaster("local")
- val sc = new SparkContext(conf)
- // 加载ip属地规则
- val ipRuelsRdd = sc.textFile("/home/peerslee/spark_data/ip.txt").map(line =>
- // map RDD 的Transformation 操作,用 f 处理一个Rdd 的所有元素,将结果输出到另一个Rdd
- {
- val fields = line.trim().split("\t")
- val start_num = ip2num(fields(0).trim())
- val end_num = ip2num(fields(1).trim())
- val province = fields(2).trim()
- (start_num, end_num, province)
- })
- // 将Rdd 转成Scala数组,并返回
- val ipRulesArray = ipRuelsRdd.collect()
- // 广播变量:保持一个缓存在每台机器上的只读变量
- val ipRulesBroadcast= sc.broadcast(ipRulesArray)
- // ip
- val ipPat = new Regex("((\\d{1,3}\\.){3}\\d{1,3})")
- // 处理加载的数据
- val ipsRDD = sc.textFile("/home/peerslee/spark_data/ex17020509.log").map(line =>
- {
- // 需要字符串
- ipPat.findFirstIn(line.toString()).mkString("")
- })
- // ((2007496192,2007496447,山东省青岛市,北京百度网讯科技有限公司联通节点),30)
- val result = ipsRDD.map(ip => {
- var info : Any = None
- if(!ip.isEmpty) {
- val ipNum = ip2num(ip)
- val index = binarySearch(ipRulesBroadcast.value, ipNum)
- info = ipRulesBroadcast.value(index)
- }
- (info, 1L)
- }).reduceByKey(_+_) // 按照key 求和
- // for(r <- result.collect()) {
- // println(r)
- // }
- // total:(Total,94)
- val total = result.reduce((x, y) => {
- val v = x._2 + y._2
- ("Total", v)
- })
- // ((2007496192,2007496447,山东省青岛市,北京百度网讯科技有限公司联通节点),0.31914893)
- val rate = result.map(x => {
- val r = x._2.toFloat / total._2
- (x._1, r)
- })
- for(r <- rate.collect()) {
- println(r)
- }
- sc.stop()
- }
- }
复制代码
其他:
log
- 2017-02-05 09:42:04 GET /uc_server/avatar.php 58.211.2.138 Mozilla/5.0+(Windows+NT+6.1;+WOW64)+AppleWebKit/537.36+(KHTML,+like+Gecko)+Chrome/55.0.2883.87+Safari/537.36 [url]http://www.aboutyun.com/thread-7746-1-1.html[/url] [url]www.aboutyun.com[/url] 0 352 1057 31
- 2017-02-05 09:42:04 GET /uc_server/avatar.php 58.211.2.138 Mozilla/5.0+(Windows+NT+6.1;+WOW64)+AppleWebKit/537.36+(KHTML,+like+Gecko)+Chrome/55.0.2883.87+Safari/537.36 [url]http://www.aboutyun.com/thread-7746-1-1.html[/url] [url]www.aboutyun.com[/url] 0 352 1058 31
- 2017-02-05 09:42:04 GET /uc_server/avatar.php 58.211.2.138 Mozilla/5.0+(Windows+NT+6.1;+WOW64)+AppleWebKit/537.36+(KHTML,+like+Gecko)+Chrome/55.0.2883.87+Safari/537.36 [url]http://www.aboutyun.com/thread-7746-1-1.html[/url] [url]www.aboutyun.com[/url] 0 370 1057 31
- 2017-02-05 09:42:04 GET /uc_server/avatar.php 58.211.2.138 Mozilla/5.0+(Windows+NT+6.1;+WOW64)+AppleWebKit/537.36+(KHTML,+like+Gecko)+Chrome/55.0.2883.87+Safari/537.36 [url]http://www.aboutyun.com/thread-7746-1-1.html[/url] [url]www.aboutyun.com[/url] 0 370 1054 31
- 2017-02-05 09:42:04 GET /uc_server/avatar.php 58.211.2.138 Mozilla/5.0+(Windows+NT+6.1;+WOW64)+AppleWebKit/537.36+(KHTML,+like+Gecko)+Chrome/55.0.2883.87+Safari/537.36 [url]http://www.aboutyun.com/thread-7746-1-1.html[/url] [url]www.aboutyun.com[/url] 0 370 1054 31
- 2017-02-05 09:42:04 GET /plugin.php 58.211.2.138 Mozilla/5.0+(Windows+NT+6.1;+WOW64)+AppleWebKit/537.36+(KHTML,+like+Gecko)+Chrome/55.0.2883.87+Safari/537.36 [url]http://www.aboutyun.com/thread-7746-1-1.html[/url] [url]www.aboutyun.com[/url] 0 925 1072 140
- 2017-02-05 09:42:04 GET /uc_server/data/avatar/000/00/55/20_avatar_middle.jpg 58.211.2.60 Mozilla/5.0+(Windows+NT+6.1;+WOW64)+AppleWebKit/537.36+(KHTML,+like+Gecko)+Chrome/55.0.2883.87+Safari/537.36 [url]http://www.aboutyun.com/thread-7746-1-1.html[/url] [url]www.aboutyun.com[/url] 0 6430 1156 109
复制代码
ip.txt(部分)
- 0.0.0.0 0.255.255.255 IANA,保留地址
- 1.0.0.0 1.0.0.255 澳大利亚, CZ88.NET
- 1.0.1.0 1.0.3.255 福建省,电信
- 1.0.4.0 1.0.7.255 澳大利亚, CZ88.NET
- 1.0.8.0 1.0.15.255 广东省,电信
- 1.0.16.0 1.0.31.255 日本,Beacon服务器
- 1.0.32.0 1.0.63.255 广东省,电信
- 1.0.64.0 1.0.127.255 日本,広島県中区大手町Energia通信公司
- 1.0.128.0 1.0.255.255 泰国, CZ88.NET
- 1.1.0.0 1.1.0.255 福建省,电信
- 1.1.1.0 1.1.1.255 澳大利亚,亚太互联网络信息中心
- 1.1.2.0 1.1.7.255 福建省,电信
- 1.1.8.0 1.1.63.255 广东省,电信
- 1.1.64.0 1.1.127.255 日本,东京都新宿区歌舞伎町i2ts公司
- 1.1.128.0 1.1.142.255 泰国,穆达汉
- 1.1.143.0 1.1.144.255 泰国, CZ88.NET
- 1.1.145.0 1.1.147.255 泰国,沙功那空
- 1.1.148.0 1.1.149.255 泰国, CZ88.NET
- 1.1.150.0 1.1.150.128 泰国,沙功那空
- 1.1.150.129 1.1.150.255 泰国,曼谷
- 1.1.151.0 1.1.151.255 泰国, CZ88.NET
- 1.1.152.0 1.1.152.127 泰国,曼谷
- 1.1.152.128 1.1.152.255 泰国,沙功那空
- 1.1.153.0 1.1.153.255 泰国,沙功那空廊曼
- 1.1.154.0 1.1.157.255 泰国,沙功那空
- 1.1.158.0 1.1.160.255 泰国,曼谷
复制代码
|
|