分享

关于Pig性能优化

Mirinda 发表于 2015-8-22 20:53:34 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 2 22389


问题导读

1.Combiner的作用是什么?
2.Join优化有哪些措施?
3.如何使用压缩来提高性能?







1. 尽早去除无用的数据

  MapReduce Job的很大一部分开销在于磁盘IO和数据的网络传输,如果能尽早的去除无用的数据,减少数据量,会提升Pig的性能。

1). 尽早的使用Filter

  使用Filter可以去除数据中无用的行(Record),尽早的Filter掉无用的数据,可以减少数据量,提升Pig性能。

  2). 尽早的使用Project(Foreach Generate)

  使用Foreach Generate可以去除数据中无用的列(Column),减少数据量,提升Pig性能。

  2. 使用Combiner

  Combiner可以对Map的结果进行combine,减少Shuffle的数据量。


  在Pig中实现UDF时,应该尽可能地实现Algebraic接口(实现Algebraic接口的function可以对中间结果执行多次而不影响最终结果,比如Count, Sum等都是Algebraic function),这样的UDF就可能进行Combine,条件如下:


  如果在Group之后的Foreach语句中,所有投影都是针对分组(Group)的列的表达式,或者是Algebraic UDF的表达式时,就可以使用Combiner(表达式包括Sum,Count,Distinct或者其他数学表达式等)。

3. Join优化

  当进行Join时,最后一个表不会放入内存,而是以stream的方式进行处理,所以最好把最大的一个表放置到Join语句的最后。


  Pig实现了以下三种定制的Join以进一步优化。

1) Replicated Join

  当进行Join的一个表比较大,而其他的表都很小(能够放入内存)时,Replicated Join会非常高效。


  Replicated Join会把所有的小表放置在内存当中,然后在Map中读取大表中的数据记录,和内存中存储的小表的数据进行Join,得到Join结果,无需Reduce。


  可以在Join时使用 Using 'replicated'语句来触发Replicated Join,大表放置在最左端,其余小表(可以有多个)放置在右端。

  2) Skewed Join

当进行Join的两个表中,一个表数据记录针对key的分布极其不均衡的时候,简单的使用Hash来分配Reduce端的key时,可能导致某些Reducer上的数据量特别大,降低整个集群的性能。


  Skewed Join可以首先对左边的表的key统计其分布,然后决定Reduce端的key的分布,尽量使得Reduce端的数据分布比较均衡。


  可以在Join时使用Using 'skewed'语句来触发Skewed Join,需要进行统计的表(亦即key可能分布不均衡的表)放置在左端。

  3) Merge Join

当进行Join的两个表都已经是有序的时,可以使用Merge Join。


  Join时,首先对右端的表进行一次采样,对采样的数据创建索引,记录(key, 文件名, 偏移[offset])。然后进行map,读取Join左边的表,对于每一条数据记录,根据前一步计算好的索引来查找数据,进行Join。


  可以在Join时使用Using 'merge'语句来触发Merge Join,需要创建索引的表放置在右端。


  另外,在进行Join之前,首先过滤掉key为Null的数据记录可以减少Join的数据量。

  4. 使用压缩来提高性能

通过压缩Map/Reduce之间的数据,以及Job之间需要传输的数据,可以显著的减少需要存储在硬盘上的和需要传输的数据,提升Pig的性能。

  1) 压缩Map/Reduce之间的数据


  通过设置mapred.compress.map.output = true可以对Map的结果进行压缩,压缩的方法可以通过下面的语句来进行设置:mapred.map.output.compression.codec = org.apache.hadoop.io.compress.GzipCodec / com.hadoop.compression.lzo.LzopCodec。


  Gzip的压缩效率比较高,但是比较消耗CPU,所以通常情况下可以使用Lzo来进行压缩。

  2) 压缩Job之间的数据

  通过设置pig.tmpfilecompression = true可以对Job之间的数据进行压缩,压缩的方法可以通过pig.tmpfilecompres sion.codec = org.apache.hadoop.io.compress.GzipCodec / com.hadoop.compression.lzo.LzopCodec来进行设置。

  5. 设置Reduce的并发数

可以通过PARALLEL = n 来设置Reduce的并发数(Map的并发数不可以设置),可以启动Reduce的操作包括:


  COGROUP, CROSS, DISTINCT, GROUP, JOIN (inner), JOIN (outer), 和 ORDER BY。

  需要注意的是,PARALLEL并不是越大越好,这需要根据集群的配置来确定,比较合理的PARALLEL数 = 集群节点数*mapred.tasktracker.reduce.tasks.maximum。后者默认为2。





本帖被以下淘专辑推荐:

已有(2)人评论

跳转到指定楼层
wangb 发表于 2016-5-31 21:44:50
学习了,楼主辛苦了
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条