RDD:基于内存的集群计算容错抽象(二)
本帖最后由 pig2 于 2015-3-19 22:23 编辑问题导读
1、RDD程序的调试工具是什么?
2、如何学习可迭代的机器应用?
static/image/hrline/4.gif
6. 实现
6.1 解释器的集成
像Ruby和Python一样,Scala也有一个交互式shell。基于内存的数据可以实现低延时,我们希望允许用户从解释器交互式地运行Spark,从而在大数据集上实现大规模并行数据挖掘。
Scala解释器通常根据将用户输入的代码行,来对类进行编译,接着装载到JVM中,然后调用类的函数。这个类是一个包含输入行变量或函数的单例对象,并在一个初始化函数中运行这行代码。例如,如果用户输入代码var x = 5,接着又输入println(x),则解释器会定义一个包含x的Line1类,并将第2行编译为println(Line1.getInstance().x)。
在Spark中我们对解释器做了两点改动:
类传输:解释器能够支持基于HTTP传输类字节码,这样worker节点就能获取输入每行代码对应的类的字节码。
改进的代码生成逻辑:通常每行上创建的单态对象通过对应类上的静态方法进行访问。也就是说,如果要序列化一个闭包,它引用了前面代码行中变量,比如上面的例子Line1.x,Java不会根据对象关系传输包含x的Line1实例。所以worker节点不会收到x。我们将这种代码生成逻辑改为直接引用各个行对象的实例。图7说明了解释器如何将用户输入的一组代码行解释为Java对象。
图7 Spark解释器如何将用户输入的两行代码解释为Java对象
Spark解释器便于跟踪处理大量对象关系引用,并且便利了HDFS数据集的研究。我们计划以Spark解释器为基础,开发提供高级数据分析语言支持的交互式工具,比如类似SQL和Matlab。
6.2 缓存管理
Worker节点将RDD分区以Java对象的形式缓存在内存中。由于大部分操作是基于扫描的,采取RDD级的LRU(最近最少使用)替换策略(即不会为了装载一个RDD分区而将同一RDD的其他分区替换出去)。目前这种简单的策略适合大多数用户应用。另外,使用带参数的cache操作可以设定RDD的缓存优先级。
6.3 rddbg:RDD程序的调试工具
RDD的初衷是为了实现容错以能够再计算(re-computation),这个特性使得调试更容易。我们创建了一个名为rddbg的调试工具,它是通过基于程序记录的Lineage信息来实现的,允许用户:(1)重建任何由程序创建的RDD,并执行交互式查询;(2)使用一个单进程Java调试器(如jdb)传入计算好的RDD分区,能够重新运行作业中的任何任务。
我们强调一下,rddbg不是一个完全重放的调试器:特别是不对非确定性的代码或动作进行重放。但如果某个任务一直运行很慢(比如由于数据分布不均匀或者异常输入等原因),仍然可以用它来帮助找到其中的逻辑错误和性能问题。
举个例子,我们使用rddbg去解决用户Spam分类作业中的一个bug,这个作业中的每次迭代都产生0值。在调试器中重新执行reduce任务,很快就能发现,输入的权重向量(存储在一个用户自定义的向量类中)竟然是空值。由于从一个未初始化的稀疏向量中读取总是返回0,运行时也不会抛出异常。在这个向量类中设置一个断点,然后运行这个任务,引导程序很快就运行到设置的断点处,我们发现向量类的一个数组字段的值为空,我们诊断出了这个bug:稀疏向量类中的数据字段被错误地使用transient来修饰,导致序列化时忽略了该字段的数据。
rddbg给程序执行带来的开销很小。程序本来就需要将各个RDD中的所有闭包序列化并通过网络传送,只不过使用rddbg同时还要将这些闭集记录到磁盘。
7. 评估
我们在Amazon EC2上进行了一系列实验来评估Spark及RDD的性能,并与Hadoop及其他应用程序的基准进行了对比。总的说来,结果如下:
(1)对于迭代式机器学习应用,Spark比Hadoop快20多倍。这种加速比是因为:数据存储在内存中,同时Java对象缓存避免了反序列化操作。
(2)用户编写的应用程序执行结果很好。例如,Spark分析报表比Hadoop快40多倍。
(3)如果节点发生失效,通过重建那些丢失的RDD分区,Spark能够实现快速恢复。
(4)Spark能够在5-7s延时范围内,交互式地查询1TB大小的数据集。
我们基准测试首先从一个运行在Hadoop上的具有迭代特征的机器学习应用(7.1)和PageRank(7.2)开始,然后评估在Spark中当工作集不能适应缓存(7.4)时系统容错恢复能力(7.3),最后讨论用户应用程序(7.5)和交互式数据挖掘(7.6)的结果。除非特殊说明,我们的实验使用m1.xlarge EC2 节点,4核15GB内存,使用HDFS作为持久存储,块大小为256M。在每个作业运行执行时,为了保证磁盘读时间更加精确,我们清理了集群中每个节点的操作系统缓存。
7.1 可迭代的机器学习应用
我们实现了2个迭代式机器学习(ML)应用,Logistic回归和K-means算法,与如下系统进行性能对比:
Hadoop:Hadoop 0.20.0稳定版。
HadoopBinMem:在首轮迭代中执行预处理,通过将输入数据转换成为开销较低的二进制格式来减少后续迭代过程中文本解析的开销,在HDFS中加载到内存。
Spark:基于RDD的系统,在首轮迭代中缓存Java对象以减少后续迭代过程中解析、反序列化的开销。
我们使用同一数据集在相同条件下运行Logistic回归和K-means算法:使用400个任务(每个任务处理的输入数据块大小为256M),在25-100台机器,执行10次迭代处理100G输入数据集(表4)。两个作业的关键区别在于每轮迭代单个字节的计算量不同。K-means的迭代时间取决于更新聚类坐标耗时,Logistic回归是非计算密集型的,但是在序列化和解析过程中非常耗时。
由于典型的机器学习算法需要数10轮迭代,然后再合并,我们分别统计了首轮迭代和后续迭代计算的耗时,并从中发现,在内存中缓存RDD极大地加快了后续迭代的速度。
表4 用于Spark基准程序的数据
应用数据描述大小
Logistic回归10亿9维点数据100G
K-means10亿10维点数据(k=10)100G
PageRank400万Wikipedia文章超链接图49G
交互式数据挖掘Wikipedia浏览日志(2008-10~2009-4)1TB
首轮迭代。在首轮迭代过程中,三个系统都是从HDFS中读取文本数据作为输入。图9中“First Iteration”显示了首轮迭代的柱状图,实验中Spark快于Hadoop,主要是因为Hadoop中的各个分布式组件基于心跳协议来发送信号带来了开销。HadoopBinMem是最慢的,因为它通过一个额外的MapReduce作业将数据转换成二进制格式。
图8 首轮迭代后Hadoop、HadoopBinMen、Spark运行时间对比
后续迭代。图9显示了后续迭代的平均耗时,图8对比了不同聚类大小条件下耗时情况,我们发现在100个节点上运行Logistic回归程序,Spark比Hadoop、HadoopBinMem分别快25.3、20.7倍。从图8(b)可以看到,Spark仅仅比Hadoop、HadoopBinMem分别快1.9、3.2倍,这是因为K-means程序的开销取决于计算(用更多的节点有助于提高计算速度的倍数)。
后续迭代中,Hadoop仍然从HDFS读取文本数据作为输入,所以从首轮迭代开始Hadoop的迭代时间并没有明显的改善。使用预先转换的SequenceFile文件(Hadoop内建的二进制文件格式),HadoopBinMem在后续迭代中节省了解析的代价,但是仍然带来的其他的开销,如从HDFS读SequenceFile文件并转换成Java对象。因为Spark直接读取缓存于RDD中的Java对象,随着聚类尺寸的线性增长,迭代时间大幅下降。
图9 首轮及其后续迭代平均时间对比
理解速度提升。
我们非常惊奇地发现,Spark甚至胜过了基于内存存储二进制数据的Hadoop(HadoopBinMem),幅度高达20倍之多,Hadoop运行慢是由于如下几个原因:
Hadoop软件栈的最小开销
读数据时HDFS栈的开销
将二进制记录转换成内存Java对象的代价
为了估测1,我们运行空的Hadoop作业,仅仅执行作业的初始化、启动任务、清理工作就至少耗时25秒。对于2,我们发现为了服务每一个HDFS数据块,HDFS进行了多次复制以及计算校验和操作。
为了估测3,我们在单个节点上运行了微基准程序,在输入的256M数据上计算Logistic回归,结果如表5所示。首先,在内存中的HDFS文件和本地文件的不同导致通过HDFS接口读取耗时2秒,甚至数据就在本地内存中。其次,文本和二进制格式输入的不同造成了解析耗时7秒的开销。最后,预解析的二进制文件转换为内存中的Java对象,耗时3秒。每个节点处理多个块时这些开销都会累积起来,然而通过缓存RDD作为内存中的Java对象,Spark只需要耗时3秒。
表5 Logistic回归迭代时间
内存中的HDFS文件内存中的本地文件缓存的RDD
文本输入
二进制输入15.38 (0.26)
8.38 (0.10)13.13 (0.26)
6.86 (0.02)2.93 (0.31)
2.93 (0.31)
7.2 PageRank
通过使用存储在HDFS上的49G Wikipedia导出数据,我们比较了使用RDD实现的Pregel与使用Hadoop计算PageRank的性能。PageRank算法通过10轮迭代处理了大约400万文章的链接图数据,图10显示了在30个节点上,Spark处理速度是Hadoop的2倍多,改进后对输入进行Hash分区速度提升到2.6倍,使用Combiner后提升到3.6倍,这些结果数据也随着节点扩展到60个时同步放大。
图10 迭代时间对比
7.3 容错恢复
基于K-means算法应用程序,我们评估了在单点故障(SPOF)时使用Lneage信息创建RDD分区的开销。图11显示了,K-means应用程序运行在75个节点的集群中进行了10轮迭代,我们在正常操作和进行第6轮迭代开始时一个节点发生故障的情况下对耗时进行了对比。没有任何失败,每轮迭代启动了400个任务处理100G数据。
图11 SPOF时K-means应用程序迭代时间
第5轮迭代结束时大约耗时58秒,第6轮迭代时Kill掉一个节点,该节点上的任务都被终止(包括缓存的分区数据)。Spark调度器调度这些任务在其他节点上重新并行运行,并且重新读取基于Lineage信息重建的RDD输入数据并进行缓存,这使得迭代计算耗时增加到80秒。一旦丢失的RDD分区被重建,平均迭代时间又回落到58秒。
7.4 内存不足时表现
到现在为止,我们能保证集群中的每个节点都有足够的内存去缓存迭代过程中使用的RDD,如果没有足够的内存来缓存一个作业的工作集,Spark又是如何运行的呢?在实验中,我们通过在每个节点上限制缓存RDD所需要的内存资源来配置Spark,在不同的缓存配置条件下执行Logistic回归,结果如图12。我们可以看出,随着缓存的减小,性能平缓地下降。
图12 Spark上运行Logistic回归的性能表现
7.5 基于Spark构建的用户应用程序
In-Memory分析。视频分发公司Conviva使用Spark极大地提升了为客户处理分析报告的速度,以前基于Hadoop使用大约20个Hive查询来完成,这些查询作用在相同的数据子集上(满足用户提供的条件),但是在不同分组的字段上执行聚合操作(SUM、AVG、COUNT DISTINCT等)需要使用单独的MapReduce作业。该公司使用Spark只需要将相关数据加载到内存中一次,然后运行上述聚合操作,在Hadoop集群上处理200G压缩数据并生成报耗时20小时,而使用Spark基于96G内存的2个节点耗时30分钟即可完成,速度提升40倍,主要是因为不需要再对每个作业重复地执行解压缩和过滤操作。
城市交通建模。在Berkeley的Mobile Millennium项目中,基于一系列分散的汽车GPS监测数据,研究人员使用并行化机器学习算法来推算公路交通拥堵状况。数据来自市区10000个互联的公路线路网,还有600000个由汽车GPS装置采集到的样本数据,这些数据记录了汽车在两个地点之间行驶的时间(每一条路线的行驶时间可能跨多个公路线路网)。使用一个交通模型,通过推算跨多个公路网行驶耗时预期,系统能够估算拥堵状况。研究人员使用Spark实现了一个可迭代的EM算法,其中包括向Worker节点广播路线网络信息,在E和M阶段之间执行reduceByKey操作,应用从20个节点扩展到80个节点(每个节点4核),如图13(a)所示:
图13 每轮迭代运行时间(a)交通建模应用程序(b)基于Spark的社交网络的Spam分类
社交网络Spam分类。Berkeley的Monarch项目使用Spark识别Twitter消息上的Spam链接。他们在Spark上实现了一个类似7.1小节中示例的Logistic回归分类器,不同的是使用分布式的reduceByKey操作并行对梯度向量求和。图13(b)显示了基于50G数据子集训练训练分类器的结果,整个数据集是250000的URL、至少10^7个与网络相关的特征/维度,内容、词性与访问一个URL的页面相关。随着节点的增加,这并不像交通应用程序那样近似线性,主要是因为每轮迭代的固定通信代价较高。
7.6 交互式数据挖掘
为了展示Spark交互式处理大数据集的能力,我们在100个m2.4xlarge EC2实例(8核68G内存)上使用Spark分析1TB从2008-10到2009-4这段时间的Wikipedia页面浏览日志数据,在整个输入数据集上简单地查询如下内容以获取页面浏览总数:(1)全部页面;(2)页面的标题能精确匹配给定的关键词;(3)页面的标题能部分匹配给定的关键词。
图14 显示了分别在整个、1/2、1/10的数据上查询的响应时间,甚至1TB数据在Spark上查询仅耗时5-7秒,这比直接操作磁盘数据快几个数量级。例如,从磁盘上查询1TB数据耗时170秒,这表明了RDD缓存使得Spark成为一个交互式数据挖掘的强大工具。
8. 相关工作
分布式共享内存(DSM)。RDD可以看成是一个基于DSM研究得到的抽象。在2.5节我们讨论过,RDD提供了一个比DSM限制更严格的编程模型,并能在节点失效时高效地重建数据集。DSM通过检查点实现容错,而Spark使用Lineage重建RDD分区,这些分区可以在不同的节点上重新并行处理,而不需要将整个程序回退到检查点再重新运行。RDD能够像MapReduce一样将计算推向数据,并通过推测执行来解决某些任务计算进度落后的问题,推测执行在一般的DSM系统上是很难实现的。
In-Memory集群计算。Piccolo是一个基于可变的、In-Memory的分布式表的集群编程模型。因为Piccolo允许读写表中的记录,它具有与DSM类似的恢复机制,需要检查点和回滚,但是不能推测执行,也没有提供类似groupBy、sort等更高级别的数据流算子,用户只能直接读取表单元数据来实现。可见,Piccolo是比Spark更低级别的编程模型,但是比DSM要高级。
RAMClouds适合作为Web应用的存储系统,它同样提供了细粒度读写操作,所以需要通过记录日志来实现容错。
数据流系统。RDD借鉴了DryadLINQ、Pig和FlumeJava的“并行收集”编程模型,通过允许用户显式地将未序列化的对象保存在内存中,以此来控制分区和基于key随机查找,从而有效地支持基于工作集的应用。RDD保留了那些数据流系统更高级别的编程特性,这对那些开发人员来说也比较熟悉,而且,RDD也能够支持更多类型的应用。RDD新增的扩展,从概念上看很简单,其中Spark是第一个使用了这些特性的系统,类似DryadLINQ编程模型,能够有效地支持基于工作集的应用。
面向基于工作集的应用,已经开发了一些专用系统,像Twister、HaLoop实现了一个支持迭代的MapReduce模型;Pregel,支持图应用的BSP计算模型。RDD是一个更通用的抽象,它能够描述支持迭代的MapReduce、Pregel,还有现有一些系统未能处理的应用,如交互式数据挖掘。特别地,它能够让开发人员动态地选择操作来运行在RDD上(如查看查询的结果以决定下一步运行哪个查询),而不是提供一系列固定的步骤去执行迭代,RDD还支持更多类型的转换。
最后,Dremel是一个低延迟查询引擎,它面向基于磁盘存储的大数据集,这类数据集是把嵌套记录数据生成基于列的格式。这种格式的数据也能够保存为RDD并在Spark系统中使用,但Spark也具备将数据加载到内存来实现快速查询的能力。
Lineage。我们通过参考到做过调研,在科学计算和数据库领域,对于一些应用,如需要解释结果以及允许被重新生成、工作流中发现了bug或者数据集丢失需要重新处理数据,表示数据的Lineage和原始信息一直以来都是一个研究课题。RDD提供了一个受限的编程模型,在这个模型中使用细粒度的Lineage来表示是非常容易的,因此它可以被用于容错。
缓存系统。Nectar能够通过识别带有程序分析的子表达式,跨DryadLINQ作业重用中间结果,如果将这种能力加入到基于RDD的系统会非常有趣。但是Nectar并没有提供In-Memory缓存,也不能够让用户显式地控制应该缓存那个数据集,以及如何对其进行分区。Ciel同样能够记住任务结果,但不能提供In-Memory缓存并显式控制它。
语言迭代。DryadLINQ能够使用LINQ获取到表达式树然后在集群上运行,Spark系统的语言集成与它很类似。不像DryadLINQ,Spark允许用户显式地跨查询将RDD存储到内存中,并通过控制分区来优化通信。Spark支持交互式处理,但DryadLINQ却不支持。
关系数据库。从概念上看,RDD类似于数据库中的视图,缓存RDD类似于物化视图。然而,数据库像DSM系统一样,允许典型地读写所有记录,通过记录操作和数据的日志来实现容错,还需要花费额外的开销来维护一致性。RDD编程模型通过增加更多限制来避免这些开销。
9. 总结
我们提出的RDD是一个面向,运行在普通商用机集群之上并行数据处理应用的分布式内存抽象。RDD广泛支持基于工作集的应用,包括迭代式机器学习和图算法,还有交互式数据挖掘,然而它保留了数据流模型中引人注目的特点,如自动容错恢复,处理执行进度落后的任务,以及感知调度。它是通过限制编程模型,进而允许高效地重建RDD分区来实现的。RDD实现处理迭代式作业的速度超过Hadoop大约20倍,而且还能够交互式查询数百G数据。
本文接昨天的上一篇:RDD:基于内存的集群计算容错抽象(一)
页:
[1]