feilong 发表于 2018-10-12 10:44:34

Spark 高级分析:第八章第9节 地理数据分析

问题导读

1.出租车数据中哪些数据可以去除?
2.处理出租车数据时如何使用前几章提到的API?
3.如何分析出租车地理数据?


关注最新经典文章,欢迎关注公众号
http://www.aboutyun.com/data/attachment/forum/201406/15/084659qcxzzg8n59b6zejp.jpg


上一篇:第八章第7,8节 准备纽约市出租车数据并处理不良记录
http://www.aboutyun.com/forum.php?mod=viewthread&tid=25541



让我们开始检查出租车地理空间数据。对于每次行程,我们有一个经度/纬度对,表示乘客被接到的地方,另一个表示他们被送下的地方。我们希望能够确定这些经度/纬度对中的每一个属于哪个行政区,并确定没有在五个行政区中的任何一个开始或结束的行程。例如,如果出租车载乘客从曼哈顿到纽瓦克国际机场,那将是一次值得分析的有效旅程,即使它不会在五个行政区之一内结束。然而,如果看起来好像一辆出租车载着一名乘客去了南极,那么我们可以合理地确信记录是无效的,应该从我们的分析中排除。


为了执行我们的自治市镇分析,我们需要加载我们之前下载并存储在文件nyc-boroughs.geojson中的GeoJSON数据。scala.io包中的Source类使将文本文件或URL的内容作为单个String读入客户端变得容易:
val geojson = scala.io.Source.
fromFile("nyc-boroughs.geojson").
mkString
现在我们需要使用本章前面的GejJSON解析工具,使用Spray和Esri到Spark shell中,这样我们就可以将geojson字符串解析成我们的FeatureCollection类的一个实例:
import com.cloudera.science.geojson._
import GeoJsonProtocol._
import spray.json.
val features = geojson.parseJson.convertTo_
我们可以创建一个样本点来测试Esri Geometry API的功能,并验证它能够正确地识别特定点属于哪个行政区:
val p = new Point(-73.994499, 40.75066)
val borough = features.find(f => f.geometry.contains(p))
在我们使用出租车出行数据的特征之前,我们应该花点时间考虑如何组织这个地理空间数据,从而达到最大的效率。一种选择是研究为地理空间查找优化的数据结构,例如四叉树,然后找到或编写我们自己的实现。但让我们看看我们能否想出一个快速的启发式方法,让我们绕过那一点工作。

find方法将遍历FeatureCollection,直到找到几何结构包含给定经度/纬度点的特性。大多数出租车始于纽约,止于曼哈顿,所以如果代表曼哈顿的地理空间特征在序列中更早,那么大部分find调用将相对快速地返回。我们可以使用每个特性的boroughCode属性可以用作排序键,其中曼哈顿的代码等于1,斯塔滕岛的代码等于5。在每个行政区的特征中,我们希望与最大多边形相关的特征出现在较小的多边形之前,因为大多数行程将往返于每个行政区的“主要”区域。将每个特征的几何结构与boroughCode和area2D()的组合进行排序,就可以做到:
val areaSortedFeatures = features.sortBy(f => {
val borough = f("boroughCode").convertTo
(borough, -f.geometry.area2D())
})
注意,我们基于area2D()值的负值进行排序,因为我们希望最大多边形优先,而Scala在缺省情况下按升序排序。
现在我们可以将frs序列中的排序特征广播到集群,并编写一个函数,该函数使用这些特征来找出特定行程以五个行政区(如果有的话)中的哪个结束:
val bFeatures = sc.broadcast(areaSortedFeatures)
def borough(trip: Trip): Option = {
val feature: Option = bFeatures.value.find(f => {
f.geometry.contains(trip.dropoffLoc)
})
feature.map(f => {
f("borough").convertTo
})
}
如果没有一个特性包含这次旅行的dropoff_loc,那么optf的值将是None,调用None值上的映射的结果仍然是None。我们可以把这个函数应用到taxitime RDD中的行程来创建横坐标是自治市镇的行程直方图:
taxiClean.values.
map(borough).
countByValue().
foreach(println)
...
(Some(Queens),672135)
(Some(Manhattan),12978954)
(Some(Bronx),67421)
(Some(Staten Island),3338)
(Some(Brooklyn),715235)
(None,338937)
正如我们所预期的,绝大多数的旅行结束在曼哈顿自治区,而在斯塔滕岛结束的旅程相对较少。一个令人惊讶的发现是任何行政区以外结束的旅行次数;没有记录的次数远远大于在布朗克斯结束的出租车旅行次数。让我们从数据中获取一些此类旅行的例子。
taxiClean.values.
filter(t => borough(t).isEmpty).
take(10).foreach(println)
当我们打印出这些记录时,我们看到它们很大一部分在点(0.0,0.0)开始和结束,表明这些记录的行程位置丢失。我们应该从数据集中过滤这些事件,因为它们不会帮助我们进行分析。
def hasZero(trip: Trip): Boolean = {
val zero = new Point(0.0, 0.0)
(zero.equals(trip.pickupLoc) || zero.equals(trip.dropoffLoc))
}
val taxiDone = taxiClean.filter {
case (lic, trip) => !hasZero(trip)
}.cache()
当我们重新运行我们对出租车的RDD的分析时,我们看到:
taxiDone.values.
map(borough).
countByValue().
foreach(println)
...
(Some(Queens),670996)
(Some(Manhattan),12973001)
(Some(Bronx),67333)
(Some(Staten Island),3333)
(Some(Brooklyn),714775)
(None,65353)
我们的零点过滤器从输出区移除了少量的观察,但是它移除了很大一部分None条目,留下更多更合理的观测值,这些观测值已经从城市外部下降。

jiewuzhe02 发表于 2018-10-16 08:16:55

观察观察
页: [1]
查看完整版本: Spark 高级分析:第八章第9节 地理数据分析