分享

Spark 高级分析:第七章第14节 用Pregel计算平均路径长度

问题导读

1.
计算图中顶点之间的路径长度是什么样的过程
2.什么是BSP?什么是Pregel?
3.GraphX中如何计算顶点之间路径长度的?



关注最新经典文章,欢迎关注公众号



上一篇:第七章第12,13节 小世界网络
http://www.aboutyun.com/forum.php?mod=viewthread&tid=25171


小世界网络的第二个特性是任意两个随机选择的节点之间的最短路径的长度往往很小。在本节中,我们将计算过滤图的大连通分支中包含的节点的平均路径长度。

计算图中顶点之间的路径长度是一个迭代过程,类似于我们用来寻找连通分支的迭代过程。在过程的每个阶段,每个顶点将维护它所知道的顶点的集合以及每个顶点的距离。然后,每个顶点将向其邻居查询其列表的内容,并且它将使用包含在其邻居列表中的、不包含在其自己的列表中的任何新顶点更新其自己的列表。这种查询邻居和更新列表的过程将贯穿整个图表,直到没有顶点能够向它们的列表添加任何新信息。

这种迭代的、以顶点为中心的大型分布式图形并行编程方法基于Google在2009年发表的一篇名为“Pregel:一个大规模图形处理系统”的论文。Pregel基于一个先于MapReduce的分布式计算模型,称为“批量同步并行”(bulk-synchronous parallel)或是BSP。BSP程序将并行处理阶段分为计算和通信两个阶段。在计算阶段,图中的每个顶点检查其自身的内部状态并决定向图形中的其他顶点发送零个或多个消息。在通信阶段,Pregel框架处理将前一通信阶段产生的消息路由到适当的顶点,然后这些顶点处理这些消息,更新它们的内部状态,并在下一个计算阶段潜在地生成新消息。计算和通信步骤的顺序继续进行,直到图中的所有顶点投票停止,此时计算结束。

BSP是最早的并行编程框架之一,它具有相当的通用性和容错性:可以这样设计BSP系统,使得在任何计算阶段的系统的状态可以被捕获和存储,以便如果特定机器发生故障,则该机器的状态可以复制到另一台机器上,在发生故障之前,整个计算可以被回滚到较早的状态,然后计算可以继续。

自从谷歌在Pregel上发表论文以来,许多开源项目已经被开发出来,它们在HDFS上复制BSP编程模型的各个方面,例如Apache Giraph和Apache Hama。这些系统已被证明对专业问题视非常有用的,能很好地符合BSP计算模型,如大规模的PageRank计算,但它们没有被广泛应用作为数据科学家常规分析工具包的一部分,因为难以将它们集成到一个标准的方式来表述数据并行处理工作流。

GraphX解决了这个问题,它允许数据科学家在方便表示数据和实现算法的情况下,轻松地将图形引入数据并行工作流,它还提供了一个内置的pregel运算符,用于在图形上表示BSP计算。在本节中,我们将演示如何使用这个运算符实现迭代的、图形并行计算,以便计算图形的平均路径长度。
1. 算出我们需要跟踪每个顶点的状态。
2. 编写一个考虑当前状态的函数,计算每一对链接顶点,以确定下一阶段发送哪些消息,
3. 写一个函数,将所有不同顶点的消息合并在一起,然后将其传递到顶点进行更新。

有三个主要的事情,我们需要决定,用pregel实现使分布式算法。首先,我们需要决定使用什么数据结构来表示每个顶点的状态,以及使用什么数据结构来表示在顶点之间传递的消息。对于平均路径长度问题,我们希望每个顶点都有一个查找表,其中包含它当前知道的顶点的ID以及这些顶点的距离。我们将存储这些信息在我们为每个顶点维护的Map[VertexId, Int]中。类似地,传递给每个顶点的消息应该是基于顶点从其邻居接收的信息的顶点ID和距离的查找表,并且我们也可以使用Map[VertexId,Int]来表示该信息。

一旦我们知道了用于表示顶点状态和消息内容的数据结构,我们就需要编写两个函数。第一个,我们称之为mergeMaps,,用于将消息从新消息合并到顶点的状态。在这种情况下,状态和消息都是Map[VertexId,Int]类型,因此我们需要合并这两个映射的内容,同时保留与两个映射中出现的任何VertexId实体关联的最小值:
[mw_shl_code=scala,true]def mergeMaps(m1: Map[VertexId, Int], m2: Map[VertexId, Int])
: Map[VertexId, Int] = {
def minThatExists(k: VertexId): Int = {
math.min(
m1.getOrElse(k, Int.MaxValue),
m2.getOrElse(k, Int.MaxValue))
}
(m1.keySet ++ m2.keySet).map {
k => (k, minThatExists(k))
}.toMap
}[/mw_shl_code]
顶点更新函数还包括VertexId值作为参数,所以我们将定义一个简单的更新函数,该函数接受VertexId和Map[VertexId,Int]参数,但是将所有实际工作委托给mergeMaps:
[mw_shl_code=scala,true]def update(
id: VertexId,
state: Map[VertexId, Int],
msg: Map[VertexId, Int]) = {
mergeMaps(state, msg)
}[/mw_shl_code]
因为在算法期间我们要传递的消息也是Map[VertexId,Int]类型的,并且我们希望合并它们并保持它们所拥有的每个密钥的最小值,所以我们也能够在Pregel运行的缩减阶段使用mergeMaps函数。

最后一步通常是最复杂的:我们需要编写代码,根据在每次迭代中从邻居那里接收到的信息,构造将发送到每个顶点的消息。这里的基本思想是,每个顶点应该将当前Map[VertexId,Int]中的每个键的值增加1,使用mergeMaps方法将增加的映射值与来自其邻居的值组合,并且如果相差很大,则将mergeMaps函数的结果发送到相邻顶点从邻居的内部映射[VrTeStID,int ]。执行这个操作序列的代码看起来如下:
[mw_shl_code=scala,true]def checkIncrement(
a: Map[VertexId, Int],
b: Map[VertexId, Int],
bid: VertexId) = {
val aplus = a.map { case (v, d) => v -> (d + 1) }
if (b != mergeMaps(aplus, b)) {
Iterator((bid, aplus))
} else {
Iterator.empty
}
}[/mw_shl_code]
有了checkIncrement函数,我们就可以定义迭代函数,用于在EdgeTriplet内的src和dst顶点的每次Pregel迭代中执行消息更新:
[mw_shl_code=scala,true]def iterate(e: EdgeTriplet[Map[VertexId, Int], _]) = {
checkIncrement(e.srcAttr, e.dstAttr, e.dstId) ++
checkIncrement(e.dstAttr, e.srcAttr, e.srcId)
}[/mw_shl_code]
在每次迭代期间,我们需要根据每个顶点已经知道的路径长度来确定需要与它们进行通信的路径长度,然后需要返回包含元组(VertexId,Map[VertexId,Int])的迭代器,其中第一个顶点Id表示其中消息应该被路由,而Map [VordStand,Int ]是消息本身。

如果在迭代期间任何顶点没有接收到任何消息,则前缀操作符假定该顶点已经完成计算,并且它将被排除在后续处理之外。一旦不再有消息从迭代方法发送到任意顶点,算法就完成了。

现在我们的功能已经完成,让我们准备BSP运行的数据。给定一个足够大的集群和大量的内存,我们可以使用GraphX的Pregel风格算法来计算每对顶点之间的路径长度。然而,这对于我们来说,不需要了解图中路径长度的一般分布;相反,我们可以随机地采样一个小的顶点子集,然后计算每个顶点到该子集的路径长度。使用RDD示例方法,让我们在不替换的情况下为我们的样本选择2%的VertexId值,使用值1729 L作为随机数生成器的种子:
[mw_shl_code=scala,true]val fraction = 0.02
val replacement = false
val sample = interesting.vertices.map(v => v._1).
sample(replacement, fraction, 1729L)
val ids = sample.collect().toSet[/mw_shl_code]
现在,我们将创建一个新的图形对象,如果顶点是采样ID的成员,顶点Map[VertexId,Int ]值只是非空的:
[mw_shl_code=scala,true]val mapGraph = interesting.mapVertices((id, _) => {
if (ids.contains(id)) {
Map(id -> 0)
} else {
Map[VertexId, Int]()
}
})[/mw_shl_code]
最后,为了启动运行,我们需要一个初始消息发送到顶点。对于这个算法,初始消息是一个空Map[VertexId,Int ]。然后,我们可以调用PREGEL方法,接着是update, iterate和mergeMaps函数在每次迭代中执行:
[mw_shl_code=scala,true]val start = Map[VertexId, Int]()
val res = mapGraph.pregel(start)(update, iterate, mergeMaps)[/mw_shl_code]
这应该运行几分钟;算法的迭代次数是1加上示例中最长路径的长度。一旦完成,我们可以flatMap顶点,以便提取(VertexId、VertexId、Int)值的元组,这些值表示计算的唯一路径长度:
[mw_shl_code=scala,true]val paths = res.vertices.flatMap { case (id, m) =>
m.map { case (k, v) =>
if (id < k) {
(id, k, v)
} else {
(k, id, v)
}
}
}.distinct()
paths.cache()[/mw_shl_code]
现在我们可以计算非零路径长度的汇总统计量,并计算样本中的路径长度直方图:
[mw_shl_code=scala,true]paths.map(_._3).filter(_ > 0).stats()
...
(count: 2701516, mean: 3.57,
stdev: 0.84, max: 8.0, min: 1.0)
val hist = paths.map(_._3).countByValue()
hist.toSeq.sorted.foreach(println)
...
(0,248)
(1,5653)
(2,213584)
(3,1091273)
(4,1061114)
(5,298679)
(6,29655)
(7,1520)
(8,38)[/mw_shl_code]
样本的平均路径长度为3.57,而上一节计算的聚类系数为0.274。下表显示了三个不同的小世界网络以及随机图的这些统计值,这些随机图是在与每个真实世界网络相同的顶点和边上生成的,并且取自一篇题为“小世界网络的多尺度可视化”的论文。作品由奥伯等(2003):

图片1.png

IMDB图是由出现在同一部电影中的演员构建的,Mac OS 9网络指的是共同包含在OS 9操作系统源代码中的相同源文件中的头文件,而.edu站点指的是彼此链接的.edu顶级域中的站点。并从阿达姆(1999)的一篇论文中得出。我们的分析表明,MEDLINE引文索引中的MeSH标签网络自然地符合我们在其他知名小世界网络中看到的平均路径长度和聚类系数值的相同范围,并且具有比我们预期的要高得多的聚类系数值。平均路径长度相对较低。

起初,小世界网络是一种好奇心;有趣的是,从社会学和政治学到神经科学和细胞生物学,许多不同类型的真实世界网络具有如此相似和特殊的结构特性。然而,最近,这些网络中偏离小世界结构的现象似乎表明了潜在的功能问题。杜克大学的杰弗里·佩特雷拉博士收集了一些研究表明,大脑神经元网络呈现出一个小世界结构,偏离这种结构发生在被诊断为阿尔茨海默病、精神分裂症、抑郁症和注意缺陷障碍的患者中。一般来说,现实世界的图表应该显示小世界的财产;如果不是,那么这可能是问题的证据,例如小世界交易图表中的欺诈活动或企业之间的信任关系。


已有(3)人评论

跳转到指定楼层
jiangzi 发表于 2018-9-7 11:33:44
Spark 高级分析:第七章第14节 用~~~
回复

使用道具 举报

jiangzi 发表于 2018-9-8 11:08:36
第七章第14节 用Pregel计算平均路径长度~~~
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条