feilong 发表于 2018-10-19 09:37:35

Spark 高级分析:第八章第10,11节 Spark会话化以及应用

本帖最后由 feilong 于 2018-10-19 09:38 编辑

问题导读

1.什么是Spark会话化?如何使用?
2.groupByKeyAndSortValues作用是什么,如何使用?
3.出租车地理数据分析的意义是什么?


关注最新经典文章,欢迎关注公众号
http://www.aboutyun.com/data/attachment/forum/201406/15/084659qcxzzg8n59b6zejp.jpg


上一篇:Spark 高级分析:第八章第9节 地理数据分析
http://www.aboutyun.com/forum.php?mod=viewthread&tid=25571

我们的目标,从许多页前,是调查之间的关系,一个市镇的司机将乘客送达,以及需要多少时间来获得另一单。此时,taxiDone RDD包含各个记录中每个出租车司机的所有单独行程,这些记录分布在数据的不同分区上。为了计算一次行程的结束和下一次行程的开始之间的时间长度,我们需要将所有的行程从单个驾驶员的换档累计到单个记录,然后按时间对换档内的行程进行排序。排序步骤允许我们比较一次行程的下车时间到下一次行程的上车时间。这种分析,我们希望分析单个实体,因为它随时间执行一系列事件,称为会话化,通常通过web日志执行,以便分析网站的用户的行为。

会话化可以是一个非常强大的技术,用于发现数据中的洞察力,以及用于构建可用于帮助人们做出更好决策的新数据产品。例如,Google的拼写纠正引擎构建在用户活动会话之上,Google每天从其web属性上发生的每个事件(搜索、点击、地图访问等)的日志记录中构建用户活动会话。为了识别可能的拼写校正候选者,Google处理这些会话,寻找用户输入了哪些查询,点击哪些内容,在几秒钟后输入了稍微不同的查询,然后单击结果而没有返回Google的情况。然后,统计该模式对于任何一对查询的发生频率。如果它出现得足够频繁(例如,如果每次我们看到查询“untied stats”,几秒钟之后就会出现查询united states”),那么我们假设第二个查询是第一个查询的拼写纠正。

该分析利用事件日志中表示的人类行为模式,比从字典中创建的任何引擎更强大的数据中构建拼写校正引擎。引擎可以用任何语言执行拼写纠正,并且可以纠正可能未包括在任何字典中的单词(例如,新启动的名称),甚至可以纠正查询,比如“解开统计”,其中没有单词拼错!Google使用类似的技术来显示推荐的和相关的搜索,以及决定哪些查询应该返回OneBox结果,该结果给出搜索页面本身上的查询的答案,而不需要用户单击另一个页面。有针对天气,体育比赛,地址,和许多其他种类的查询的OneBox。

到目前为止,关于发生在每个实体上的一组事件的信息分布在RDD的分区中,因此,为了进行分析,我们需要将这些相关事件彼此相邻并按时间顺序排列。在下一节中,我们将展示如何使用在Spark1.2.0中引入的一些高级功能来高效地构造和分析会话。
在Spark中创建会话的方法是对要为其创建会话的标识符执行groupBy,然后根据时间戳标识符对洗牌后的事件进行排序。如果每个实体只有少量事件,那么这种方法将工作得相当好。然而,由于这种方法要求任何特定实体的所有事件同时在内存中,因此它不会随着每个实体的事件数量越来越大而扩展。我们需要一种构建会话的方法,该方法不要求同时在内存中保存特定实体的所有事件用于排序。

在MapReduce中,我们可以通过执行次要排序来构建会话,其中创建由标识符和时间戳值组成的复合键,对复合键上的所有记录进行排序,然后使用自定义分区器和分组函数来确保所有记录具有相同的ID赋值符出现在同一个输出分区中。幸运的是,Spark还可以通过利用其repartitionAndSortWithinPartitions转换来支持相同的辅助排序模式。

在库中,我们提供了一个 转换的实现groupByKeyAndSortValues。由于此功能的工作基本上与本章所涵盖的概念正交,因此这里我们省略了血淋淋的细节。Spark JIRA SPARK-3655的工作正在向Spark Core添加这样的转换。

转换接受四个参数:

[*]我们要操作的键值对的RDD。
[*]接受一个值并提取二次键进行排序的函数。
[*]一个可选的拆分函数,可以用相同的键将排序的操作分解成多组。在我们的例子中,我们将使用这个方法从同一个driver中分解多个变化。
[*]输出RDD中的分区数。

在这种情况下,我们的副主键是行程的上车时间:
def secondaryKeyFunc(trip: Trip) = trip.pickupTime.getMillis
我们需要决定我们应该用什么标准来确定何时一个轮班结束,另一个轮班开始。就像我们在本章中做出的其他一些选择(例如,过滤出持续超过3小时的行程),这是一个有点武断的选择,我们需要意识到这个选择可能如何影响我们随后分析的结果。这是一个好主意,特别是在会话分析的早期阶段,尝试许多不同的划分标准,看看我们的分析结果如何变化。一旦我们确定了一个合理的时间窗口来区分不同的班次,重要的是做出选择——即使它有点武断——并长期坚持这个选择。作为数据科学家,我们的主要兴趣是事物如何随时间变化,并且保持我们对数据和度量的定义恒定,允许我们在长期内进行有效的比较。

让我们先选择四个小时作为我们的阈值,这样相继的车之间的任何时间间隔超过该时间将被视为两个单独的班次,并且中间时间将被视为驾驶员不接受新乘客的休息时间:
def split(t1: Trip, t2: Trip): Boolean = {
val p1 = t1.pickupTime
val p2 = t2.pickupTime
val d = new Duration(p1, p2)
d.getStandardHours >= 4
}
利用第二密钥函数和分裂函数,我们可以进行分组和排序。因为这个操作触发了一个洗牌和一个公平的计算,并且我们需要不止一次地使用结果,所以我们缓存结果:
val sessions = groupByKeyAndSortValues(
taxiDone, secondaryKeyFunc, split, 30)
sessions.cache()
结果是一个RDD[(String,List]]],其中对于相同的驱动程序,所有行程都属于相同的移位,并且按时间对行程进行排序。
执行会话化流水线是一项昂贵的操作,并且会话化数据通常对于我们可能希望执行的许多不同的分析任务很有用。在您可能希望稍后了解分析或与其他数据科学家协作的设置中,最好通过只执行一次会话,然后将会话化数据写入HDFS,以便将其用于解答很多不同的问题。执行一次会话化也是在整个数据科学团队中执行会话定义的标准规则的好方法,这对于确保结果的同类比较具有相同的好处。

此时,我们准备分析我们的会话数据,看看司机在特定地区下车后要花多长时间才能找到下一单。我们将创建一个boroughDuration方法,该方法采用Trip类的两个实例,并计算第一次行程的市镇以及第一次行程的下车时间和第二次行程的上车时间之间的Duration:
def boroughDuration(t1: Trip, t2: Trip) = {
val b = borough(t1)
val d = new Duration(
t1.dropoffTime,
t2.pickupTime)
(b, d)
}
我们想把我们的新函数应用到我们的RDD中的所有顺序的Trips对。尽管我们可以编写for循环来做到这一点,但我们也可以使用Scala Collections API的滑动方法以更加有效的方式获得序列对:
val boroughDurations: RDD[(Option, Duration)] =
sessions.values.flatMap(trips => {
val iter: Iterator] = trips.sliding(2)
val viter = iter.filter(_.size == 2)
viter.map(p => boroughDuration(p(0), p(1)))
}).cache()
对滑动方法的结果的筛选器调用确保我们忽略仅包含一次行程的任何会话,并且在会话上的flatMap的结果是我们现在可以检查的RDD[(Option,Duration)]。首先,我们应该做一个验证检查,以确保大部分的持续时间是非负的:
bdrdd.values.map(_.getStandardHours).
countByValue().
toList.
sorted.
foreach(println)
...
(-2,2)
(-1,17)
(0,13367875)
(1,347479)
(2,76147)
(3,19511)
只有少数记录具有否定的持续时间,当我们更仔细地检查它们时,它们似乎没有任何共同的模式,可以用来理解错误数据的来源。我们将从持续时间分布的分析中排除这些记录,我们可以借助以前使用的Spark的StatCounter类来计算持续时间分布:
import org.apache.spark.util.StatCounter
boroughDurations.filter {
case (b, d) => d.getMillis >= 0
}.mapValues(d => {
val s = new StatCounter()
s.merge(d.getStandardSeconds)
}).
reduceByKey((a, b) => a.merge(b)).collect().foreach(println)
...
(Some(Bronx),(count: 56951, mean: 1945.79,
stdev: 1617.69, max: 14116, min: 0))
(None,(count: 57685, mean: 1922.10,
stdev: 1903.77, max: 14280, min: 0))
(Some(Queens),(count: 557826, mean: 2338.25,
stdev: 2120.98, max: 14378.000000, min: 0))
(Some(Manhattan),(count: 12505455, mean: 622.58,
stdev: 1022.34, max: 14310, min: 0))
(Some(Brooklyn),(count: 626231, mean: 1348.675465,
stdev: 1565.119331, max: 14355, min: 0))
(Some(Staten Island),(count: 2612, mean: 2612.24,
stdev: 2186.29, max: 13740, min: 0.000000))
正如我们所预期的,数据显示,在曼哈顿下车的司机最短的停车时间只有10分钟以上。在布鲁克林结束的出租车停车时间是两倍多,而在斯塔登岛结束的出租车相对较少,司机平均要花45分钟才能到达下一趟。

正如数据表明,出租车司机有一个主要的经济动机,以区别对待乘客基于他们的最终目的地,在斯塔滕岛的下降,特别是涉及大量的停车时间司机。多年来,纽约出租车和Limousine委员会作出了重大努力,以查明这种歧视,并对因拒绝乘客而遭逮捕的司机处以罚款。尝试检查异常短的出租车行程的数据会很有意思,这可能表明司机和乘客之间关于乘客想在哪里下车的争论。

下一步
设想一下,在出租车数据上使用相同的技术,以便根据当前交通模式和包含在此数据中的下一个最佳位置的历史记录,构建一个应用程序,该应用程序可以推荐出租车下车后的最佳位置。您还可以从试图搭乘出租车的人的角度来看待这些信息:给定当前时间、地点和天气数据,在接下来的5分钟内我能够从街上叫到出租车的概率是多少?这类信息可以被整合到Google Maps等应用程序中,以帮助旅行者决定何时离开,以及应该选择哪种旅行。

Esri API是一些可以帮助与基于JVM语言的地理空间数据交互的不同工具之一。另一个是GeoTrellis,Scala中的地理空间库,它可以从Spark中轻易地获得。第三是GeoTools,一个基于Java的GIS工具包。




jiangzi 发表于 2018-10-20 10:50:07

学习了, 非常好~~~

jiangzi 发表于 2018-10-20 10:51:52

学习了, 非常好~~~
页: [1]
查看完整版本: Spark 高级分析:第八章第10,11节 Spark会话化以及应用