分享

RDD迭代元算问题,求助大家

ltne 发表于 2017-3-4 20:56:43 [显示全部楼层] 只看大图 回帖奖励 阅读模式 关闭右栏 7 8356
问题:比如我有八个数据0 1 2 3 4 5 6 7迭代多次,比如:
第一次:
0和4:位置的两个元素运算生成两个新的元素放到0和4位置上;
1和5:位置的两个元素运算生成两个新的元素放到1和5位置上;
2和6:位置的两个元素运算生成两个新的元素放到2和6位置上;
3和7:位置的两个元素运算生成两个新的元素放到3和7位置上;

第二次:
0和2:位置的两个元素运算生成两个新的元素放到0和2位置上;
1和3:位置的两个元素运算生成两个新的元素放到1和3位置上;
4和6:位置的两个元素运算生成两个新的元素放到4和6位置上;
5和7:位置的两个元素运算生成两个新的元素放到5和7位置上;


第n次。。。
总之每次都是对应位置两个元素运算重新生成两个两个元素放到对应的位置,这个位置我们可以通过key标志;
val pairDatas;
while (i <= calNum) {
pairDatas = datas.zipWithIndex().map(x => (x._2, x._1)).groupByKeyval pairAnsDatas = pairDatas.map(func)  //生成一个pair,其中key和value分别代表两个位置上的新生成的元素;
pairDatas = pairAnsDatas.keys.union(pairAnsDatas.values)

上面的代码会多次迭代,迭代运算pairDatas多次迭代后会导致pairDatas 的partitions数目指数增长,每迭代一次增长一倍,
最后会出现pairDatas.count < pairDatas.getNumPartitions 的情况,此处会因为partitions过多而导致task数目过多,
导致运算速度变慢,大家有什么好的处理方案吗? QQ截图20170304210031.png


已有(7)人评论

跳转到指定楼层
starrycheng 发表于 2017-3-4 22:10:47
这个需要从根本解决,应该换个思路。今天太晚了,明天帮你想想
回复

使用道具 举报

einhep 发表于 2017-3-5 07:49:55
用缓存吧cache(),这样就不用每次重复计算了。尝试下面,或则楼主在优化改进下
val pairDatas;
while (i <= calNum) {
pairDatas = datas.zipWithIndex().map(x => (x._2, x._1)).cache().groupByKey
val pairAnsDatas = pairDatas.map(func).cache()  //生成一个pair,其中key和value分别代表两个位置上的新生成的元素;
pairDatas = pairAnsDatas.keys.union(pairAnsDatas.values).cache()

推荐参考
spark缓存cache()详解
http://www.aboutyun.com/forum.php?mod=viewthread&tid=21120







回复

使用道具 举报

ltne 发表于 2017-3-5 16:41:07
einhep 发表于 2017-3-5 07:49
用缓存吧cache(),这样就不用每次重复计算了。尝试下面,或则楼主在优化改进下
val pairDatas;
while (i ...

此处我加上catch后效果不大,这里主要问题是因为每次运算生成两个数据,然后采用union结合新生成的数据组成一个新的RDD,此处会导致partitions数目翻倍,在下次迭代的时候会导致task的数目过多,执行时间变慢,甚至会导致rdd.count < rdd.getNumPartitions的情况,出现分区数量少于数据数量的问题。
回复

使用道具 举报

ltne 发表于 2017-3-5 16:42:26
starrycheng 发表于 2017-3-4 22:10
这个需要从根本解决,应该换个思路。今天太晚了,明天帮你想想

请大神给个好的思路,急求
回复

使用道具 举报

starrycheng 发表于 2017-3-5 17:40:20
ltne 发表于 2017-3-5 16:42
请大神给个好的思路,急求

思路来自对业务场景的了解。否则不行的。要么就不要迭代。从目前的情况来看,只能优化集群,调整配置参数减少任务数。
spark_home/conf/spark-default.conf配置文件设置:如下面参数等
spark.sql.shuffle.partitions  
spark.default.parallelism

回复

使用道具 举报

ltne 发表于 2017-3-5 18:58:00
starrycheng 发表于 2017-3-5 17:40
思路来自对业务场景的了解。否则不行的。要么就不要迭代。从目前的情况来看,只能优化集群,调整配置参数 ...

我将pairDatas = pairAnsDatas.keys.union(pairAnsDatas.values)添加coalesce

pairDatas = pairAnsDatas.keys.union(pairAnsDatas.values).coalesce(4)
这样使得每次迭代数据始终是四个partition,task数目始终是四个,速度提升了一些,但是感觉还有优化的地方,您在看看还有什么建议,其实这个算法是FFT(快速傅立叶变化)的实现,如果您有好的实现思路,一定要告诉我,跪求
回复

使用道具 举报

ltne 发表于 2017-3-9 21:25:11
大神求助啊,coalesce存在大量的shuffle,速度蜗牛般慢啊,该如何处理呢,FFT 算法有什么好的思路吗
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条