分享

join实践: 万亿级数据量任务优化历程

Mirinda 2022-4-11 11:54:43 发表于 实操演练 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 0 2625
本帖最后由 Mirinda 于 2022-4-11 12:09 编辑

问题导读:
1.你熟悉join吗?
2.文中三次优化有什么不同?
3.如果是你你会怎么优化?


优化前
  1. SELECT  count(*)
  2. FROM    tbl_0 a
  3. JOIN    tbl_1 b
  4. ON      a.ds = 20220310
  5. AND     b.ds = 20220310
  6. AND     a.key = b.key
  7. ;
复制代码
大概执行2h,  还未得出结果。

第一次优化
暴力增加join 的并行度, 没有什么优化是比加资源来得更直接。
  1. set odps.sql.joiner.instances=1000; //表示join 的并行度加到1000
  2. SELECT  count(*)
  3. FROM    tbl_0 a
  4. JOIN    tbl_1 b
  5. ON      a.ds = 20220310
  6. AND     b.ds = 20220310
  7. AND     a.key = b.key
  8. ;
复制代码
大概执行2h,  仍未得出结果。

第二次优化
重新分析两张表数据量,a 表数据量750w+,  b 表数据量350w+, 在未做任何优化情况下数据是需要经过shuffle, 将相同的key分布到相同的节点上, 首先考虑使用mapjoin 解决,使其不用执行shuffle操作。
  1. SELECT /*+mapjoin(b)*/ count(*)
  2. FROM    tbl_0 a
  3. JOIN    tbl_1 b
  4. ON      a.ds = 20220310
  5. AND     b.ds = 20220310
  6. AND     a.key = b.key
  7. ;
复制代码
大概执行2h,  仍未得出结果。

第三次优化
重新分析表数据分布情况, 查看a、b 两张表的join-key 的数据情况:
  1. SELECT
  2.         key
  3.         ,count(*)
  4. FROM    tbl_0/tbl_1
  5. WHERE   ds =20220312
  6. GROUP BY KEY
  7. ORDER BY count(*) desc;  
复制代码

a 表

WorkWell
1586079
GoodQuality
1428452
ProductExperience
1186742
BuyerRecomendSeller
1147469
UserExperience
763998


b表

ProductExperience
832075
UserExperience
309142
GoodQuality
245208
BuyerRecomendSeller
213484
SPS_Material
196508

两张表的key 的类型不多,但是单个key值的个数比较多,例如
GoodQuality 在a表中1428452条记录,在b表中245208条记录,最终就会产生 1428452*245208=3500亿的数据量,这样相同的GoodQuality 分布到同一个节点去处理,很明显发生数据长尾效应。对于这样的情况,普通的mapjoin 或者是sort-merge已经不适合了,需要尽可能的将key分散,分发到不同的节点去处理,因此使用随机前缀+扩容的方式处理。
什么是随机前缀+扩容?对其中一张表数据量扩容n倍,另外一张表对join-key生成随机0~n的随机前缀数据,通过这种方式将join-key充分打散到下游不同的节点处理,以达到优化效果。在这里通过定义udf 实现随机前缀, udtf实现数据扩容:
  1. //生成max以内的随机数
  2. public class RandomData extends UDF {
  3.      public Random r;
  4.      @Override
  5.     public void setup(ExecutionContext ctx) throws UDFException {
  6.       r=new Random();
  7.     }
  8.     public Integer evaluate(Integer max) {
  9.         return  r.nextInt(max);
  10.     }
  11. }
复制代码

  1. //数据量扩充
  2. public class ExpandData extends UDTF {
  3.     @Override
  4.     public void setup(ExecutionContext ctx) throws UDFException {
  5.     }
  6.     @Override
  7.     public void process(Object[] args) throws UDFException {
  8.       Long expand=(Long)(args[0]);//代表了扩充的倍数
  9.       Object[] args1=new Object[args.length];
  10.       for(int i=0;i<expand;i++){
  11.            for(int j=0;j<args.length;j++){
  12.                args1[j]=i+"_"+args[j];
  13.            }
  14.           super.forward(args1);
  15.       }
  16.     }
  17.     @Override
  18.     public void close() throws UDFException {
  19.     }
  20. }
复制代码
然后重新执行SQL:
  1. set odps.sql.joiner.instances=1000;
  2. SELECT
  3.   count(*)
  4. from (
  5. select *, CONCAT_WS('_',RandomData(1000),key) newKey  from  tbl_0
  6. where ds=20220312
  7. ) a join (
  8. SELECT  newKey from (
  9. SELECT
  10. key
  11. from
  12. tbl_1  where ds=20220312)
  13. LATERAL view  ExpandData(1000,key) tmp as cnt,newKey
  14. ) b on a.newKey=b.newKey;
复制代码
耗时20min左右得出结果,最终得到的结果大于一万亿。


最新经典文章,欢迎关注公众号


原文链接:https://mp.weixin.qq.com/s/VdR874eGPp5gSr7YAiBaKA

没找到任何评论,期待你打破沉寂

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条