分享

Impala性能调整一

52Pig 2014-10-7 15:24:45 发表于 连载型 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 1 14519
本帖最后由 pig2 于 2014-10-14 09:14 编辑
阅读导读:
1.影响Impala功能性能的各种因素?
2.如何对Impala查询进行性能调整和监控?








在开始任何性能调整和基准测试之前,请确保你的系统已经按照 Post-Installation Configuration for Impala 中的设置进行配置。
  • Partitioning. 这一技术基于频繁查询的列上的不同的值,把数据物理拆分开来,允许查询跳过读取表中很大部分的数据

  • Performance Considerations for Join Queries. 相对于修改物理因素,如文件格式或硬件配置,连接是你可以在 SQL 层级进行调整的主要方面(Joins are the main class of queries that you can tune at the SQL level, as opposed to changing physical factors such as the file format or the hardware configuration)。对于连接的性能,相关的主题 Column StatisticsTable Statistics 同样重要

  • Table Statistics and Column Statistics. 使用 COMPUTE STATS 语句,采集表和列的统计信息,帮助 Impala 自动优化连接查询的性能,而不需要修改 SQL 查询语句(在 Impala 1.2.2 及以上版本,这一过程特别简单,因为 COMPUTE STATS 语句在同一个语句中同时采集两种信息,而且不需要像之前在 Hive 中运行 ANALYZE TABLE 语句那样,不再需要执行任何设置和配置)

  • Testing Impala Performance. 在进行任何基准测试之前,执行一些安装后测试(post-setup testing),以确保 Impala 使用了性能最优的设置

  • Benchmarking Impala Queries. 用于 Impala 初始实验的配置和样本数据通常不适合进行性能测试(The configuration and sample data that you use for initial experiments with Impala is often not appropriate for doing performance tests)

  • Controlling Resource Usage. 更多内存,更加性能(The more memory Impala can utilize, the better query performance you can expect)。在一个同样运行其他负载的集群中,你必须权衡考虑,保证所有 Hadoop 组件具有能良好运行的足够内存,因此你可能限制 Impala 可使用的内存


分区
表的所有数据文件默认放在一个目录下。分区是一项基于一个或多个上的值,在载入时物理拆分数据的技术。例如,对于根据 year 列分区的school_records 表来说,对于每一个不同的年份都有一个单独的数据目录,并且这一年的所有数据都存放在这个目录下的数据文件中。一个包含 WHERE 条件如 YEAR=1966, YEAR IN (1989,1999), YEAR BETWEEN 1984 AND 1989 的查询,可以只从对应的一个或多个目录下检索数据文件,极大的减少了读取和测试的数据的数量。
分区通常对应:
非常大的表,完整读取整个数据集花费的时间不可想象(where reading the entire data set takes an impractical amount of time)
全部或几乎所有的查询都包含分区列查询条件的表。我们上面的例子中那个根据 year 分区的表, SELECT COUNT(*) FROM school_records WHERE year = 1985 是高效的,只检索数据的一小部分;但是 SELECT COUNT(*) FROM school_records 则必须处理每一年的单独的数据文件,导致必未分区表更多的工作。假如你频繁基于 last name, student ID, 等等不检测年份的对表进行查询,考虑不分区表

列包含合理的基数(cardinality--不同值的个数)。假如列只包括少量的值,如 Male 或 Female,你无法通过对每一个查询消除大约 50% 数据的读取来获得更高的效率。假如列的每一个值只对应很少的行,要处理的目录的数量会变成一个限制因素,并且每一个目录中的数据文件可能太小了,无法从 Hadoop 以 multi-megabyte 块传输数据的机制受益。例如,你可能用年来分区人口数据,用年和月来存放销售数据,用年月日来分区网络流量数据(一些更高流量的用户甚至用小时和分钟来分区数据)

总是使用抽取、转换、加载(ETL)管道加载的数据。分区列的值从原始的数据文件剥离,并对应到目录名中,因此加载数据到分区表涉及了某种转换或预处理(The values of the partitioning columns are stripped from the original data files and represented by directory names, so loading data into a partitioned table involves some sort of transformation or preprocessing)

在 Impala SQL 语法中,分区会影响到这些语句:
  • CREATE TABLE: 在创建表时,使用 PARTITIONED BY 子句来标识分区列的名称和数据类型。表中的列不包括这些分区列
  • ALTER TABLE: 可以添加或删除分区,用于处理海量数据集的不同部分。对于根据日期值分区的数据,你可以不再保留"过期(age out)"的数据
  • INSERT: 当向分区表插入数据时,需要标识分区列。对于插入的每一行,分区列的值没有保存在数据文件里,而是根据行存储的目录名称确定。也可以使用 INSERT OVERWRITE 语句来加载一组数据到指定的分区;你可以替换指定分区的内容但是不能向指定分区追加数据
  • 尽管表分区与否的 SELECT 语句的语法相同,对分区表的查询方式可能对性能和可扩展性产生戏剧性的影响。在查询过程中,让查询跳过某些分区的机制称作分区修剪(partition pruning);参见 Partition Pruning for Queries 了解详细信息

参见 Attaching an External Partitioned Table to an HDFS Directory Structure 中的例子,演示了创建分区表的语法,HDFS 中底层的目录结构,以及如何连接到 Impala 外部分区表中存出来 HDFS 其他位置的数据文件(how to attach a partitioned Impala external table to data files stored elsewhere in HDFS)

参见 Partitioning for Parquet Tables 了解 Parquet 分区表的性能注意事项。
参见 NULL 了解分区表中 NULL 值如何对应。

针对查询进行分区修剪(Partition Pruning)
分区修剪(Partition pruning)指的是一种查询可以跳过一个或多个分区对应的数据文件不进行读取的技术。假如你能安排你的查询从查询计划中剪除大量的不必要的分区,查询使用更少的资源,因此与剪除的不必要的分区成比例的变快,并且更可扩展(If you can arrange for queries to prune large numbers of unnecessary partitions from the query execution plan, the queries use fewer resources and are thus proportionally faster and more scalable)。

例如,如果一个表使用 YEAR, MONTH, DAY 分区,这样如 WHERE year = 2013, WHERE year < 2010, WHERE year BETWEEN 1995 AND 1998 等 WHERE 子句允许 Impala 除了指定范围的分区外,跳过所有其他分区的数据文件。同样的,WHERE year = 2013 AND month BETWEEN 1 AND 3 甚至可以剪除更多的分区,只读取一年中的一部分数据文件。

在执行查询之前,通过检查 EXPLAIN 查询的输出来检查查询分区修剪的效果。例如,下面例子中的表有 3 个分区,而查询只读取其中 1 个。EXOLAIN 计划中的标识符 #partitions=1/3 证明 Impala 可以进行对应的分区修剪。

  1. [localhost:21000] > insert into census partition (year=2010) values ('Smith'),('Jones');
  2. [localhost:21000] > insert into census partition (year=2011) values ('Smith'),('Jones'),('Doe');
  3. [localhost:21000] > insert into census partition (year=2012) values ('Smith'),('Doe');
  4. [localhost:21000] > select name from census where year=2010;
  5. +-------+
  6. | name  |
  7. +-------+
  8. | Smith |
  9. | Jones |
  10. +-------+
  11. [localhost:21000] > explain select name from census where year=2010;
  12. +------------------------------------------------------------------+
  13. | Explain String                                                   |
  14. +------------------------------------------------------------------+
  15. | PLAN FRAGMENT 0                                                  |
  16. |   PARTITION: UNPARTITIONED                                       |
  17. |                                                                  |
  18. |   1:EXCHANGE                                                     |
  19. |                                                                  |
  20. | PLAN FRAGMENT 1                                                  |
  21. |   PARTITION: RANDOM                                              |
  22. |                                                                  |
  23. |   STREAM DATA SINK                                               |
  24. |     EXCHANGE ID: 1                                               |
  25. |     UNPARTITIONED                                                |
  26. |                                                                  |
  27. |   0:SCAN HDFS                                                    |
  28. |      table=predicate_propagation.census #partitions=1/3 size=12B |
  29. +------------------------------------------------------------------+
复制代码
通过WHERE 子句中其他部分的中间属性,甚至在分区键列没有明确指定常量值的时候,Impala 都可以进行分区修剪(Impala can even do partition pruning in cases where the partition key column is not directly compared to a constant, by applying the transitive property to other parts of the WHERE clause)。这一技术称为谓词传播(predicate propagation),自 Impala 1.2.2 开始可用。在下面例子里,表 census 中包含另一个列存放数据收集的时间(是 10 年采集的)。即使分区键列 (YEAR) 没有对应一个常量, Impala 也可以推断只有 YEAR=2010 分区是必需的,并再次只读取了总分区中 1/3 个分区。

  1. [localhost:21000] > drop table census;
  2. [localhost:21000] > create table census (name string, census_year int) partitioned by (year int);
  3. [localhost:21000] > insert into census partition (year=2010) values ('Smith',2010),('Jones',2010);
  4. [localhost:21000] > insert into census partition (year=2011) values ('Smith',2020),('Jones',2020),('Doe',2020);
  5. [localhost:21000] > insert into census partition (year=2012) values ('Smith',2020),('Doe',2020);
  6. [localhost:21000] > select name from census where year = census_year and census_year=2010;
  7. +-------+
  8. | name  |
  9. +-------+
  10. | Smith |
  11. | Jones |
  12. +-------+
  13. [localhost:21000] > explain select name from census where year = census_year and census_year=2010;
  14. +------------------------------------------------------------------+
  15. | Explain String                                                   |
  16. +------------------------------------------------------------------+
  17. | PLAN FRAGMENT 0                                                  |
  18. |   PARTITION: UNPARTITIONED                                       |
  19. |                                                                  |
  20. |   1:EXCHANGE                                                     |
  21. |                                                                  |
  22. | PLAN FRAGMENT 1                                                  |
  23. |   PARTITION: RANDOM                                              |
  24. |                                                                  |
  25. |   STREAM DATA SINK                                               |
  26. |     EXCHANGE ID: 1                                               |
  27. |     UNPARTITIONED                                                |
  28. |                                                                  |
  29. |   0:SCAN HDFS                                                    |
  30. |      table=predicate_propagation.census #partitions=1/3 size=22B |
  31. |      predicates: census_year = 2010, year = census_year          |
  32. +------------------------------------------------------------------+
复制代码
在执行查询之后,立刻检查 PROFILE 语句的输出,了解实际读取和处理的数据量更详细的分析。
假如是在分区表上建立的视图,所有的分区修剪都是由原使得查询子句确定。即使在试图上的查询包含了引用分区键列的 WHERE 子句,Impala 不会修剪添加的列(If a view applies to a partitioned table, any partition pruning is determined by the clauses in the original query. Impala does not prune additional columns if the query on the view includes extra WHEREclauses referencing the partition key columns)。

分区键列

你选择的分区列应当是那种经常在重要的、大型的查询中过滤查询结果的列。通常来说,数据与时间值有关时,使用年、月、日的组合作为分区列,数据与一些位置有关时使用地理区域作为分区列。

  • 对于基于时间的数据,拆分出其中的各个部分到单独的列,因为 Impala 不能基于 TIMESTAMP 列进行分区
  • 分区列的数据类型对存储需求方面没有明显的影响,因为分区列的值不是存放在数据文件里,而是在 HDFS 目录名对应的字符串里
  • Remember that when Impala queries data stored in HDFS, it is most efficient to use multi-megabyte files to take advantage of the HDFS block size. 对于 Parquet 表,块大小 (数据文件理想大小) 是 1GB。因此,应避免指定太多分区键列,这样会导致个别分区只包含少量数据。例如,假如你每天获取 1GB 数据,你可能使用年、月、日进行分区;当你每分钟获取 5GB 数据时,你可能使用年、月、日、时、分来分区。假如你的数据保护地理组件,假如你每个邮编都有很多M的数据时,你可以基于邮编分区;假如不是,那么你可能需要使用更大的区域,如 city, state, 或 country. state 分区。

为分区设置不同的文件格式

分区表具有为不同的分区设置不同的文件格式的灵活性。例如,你原来是接收文本格式数据,然后是 RCFile 格式,最终会接收 Parquet 格式,所有这些数据可以存放在同一个表里进行查询。你只需要确保该表的结构是使用不同文件格式的的数据文件分别在单独的分区。

例如,下面是当你收到不同年份的数据时,你可能从文本切换到 Parquet:
  1. [localhost:21000] > create table census (name string) partitioned by (year smallint);
  2. [localhost:21000] > alter table census add partition (year=2012); -- Text format;
  3. [localhost:21000] > alter table census add partition (year=2013); -- Text format switches to Parquet before data loaded;
  4. [localhost:21000] > alter table census partition (year=2013) set fileformat parquet;
  5. [localhost:21000] > insert into census partition (year=2012) values ('Smith'),('Jones'),('Lee'),('Singh');
  6. [localhost:21000] > insert into census partition (year=2013) values ('Flores'),('Bogomolov'),('Cooper'),('Appiah');
复制代码
如上所述,HDFS 目录 year=2012 包含文本格式数据文件,而 HDFS 目录 year=2013 包含 Parquet 数据文件。一如既往,当加载实际数据时,你应当使用 INSERT ... SELECT 或 LOAD DATA 来导入大批量的数据,而不是使用产生少量的对实际查询低效的文件的 INSERT ... VALUES 语句。对于其他的 Impala 无法本地创建的文件类型,你可以切换到 Hive 并执行 ALTER TABLE ... SET FILEFORMAT 语句,并在这里执行 INSERT 或 LOAD DATA 语句。当切换回 Impala 后,执行 REFRESH table_name 语句以便 Impala 感知到通过 Hive 添加的任意分区或新数据。

连接查询性能注意事项
涉及连接操作的查询通常比只引用单个表的查询更需要调整。连接查询结果集的最大大小是所有连接的表中行数的乘积。当连接几个百万或十亿记录的表时,任何过滤结果集的失误,或查询中其他的低效操作,都将会导致操作无法完成不得不取消 。

调整 Impala 连接查询的最简单的技术就是在参与连接的每个表上使用 COMPUTE STATS 语句采集统计信息,然后让 Impala 基于每一个表的大小、每一个列不同值的个数、等等信息自动的优化查询。COMPUTE STATS 语句和 连接优化(join optimization)是 Impala 1.2.2 引入的新功能。为了保证每个表上统计信息的精确,请在表加载数据之后执行 COMPUTE STATS 语句,并在因 INSERT, LOAD DATA, 添加分区等操作导致数据大幅变化之后再次执行。假如连接查询中所有表的统计信息不可用,或 Impala 选择的连接顺序不是最优,你可以通过在 SELECT 关键字之后立刻紧跟 STRAIGHT_JOIN 关键字,来覆盖自动的连接顺序优化。这时候,Impala 使用表在查询中出现的顺序来指导连接如何处理。首先是最大的表,然后是次大的,依此类推。术语"最大"和"最小"指中间结果集的大小,这些基于作为结果集一部分的每个表的行数和列数(The terms "largest" and "smallest" refers to the size of the intermediate result set based on the number of rows and columns from each table that are part of the result set)。例如,如果你连接了表sales 和 customers,查询可能是从产生了 5000 次购买的 100 个用户中查找结果集。这时候,你应该使用 SELECT ... FROM sales JOIN customers ..., 把 customers 放在右侧,因为在这个查询上下文中它更小。依赖于表的绝对和相对的大小,Impala 查询计划器在执行连接查询的不同技术之间进行选择。广播连接(Broadcast joins) 是默认方式,右侧的表被认为比左侧的表小,并且它的内容被发送到查询涉及到的其他节点上。替代的技术称作分割连接(partitioned join) (与分区表无关),更适用于近乎相同大小的大型表的连接。使用这一技术,每一个表的部分内容被发送到对应的其他节点,然后这些行的子集可以并行处理。广播和分区连接的选择仍然依赖于连接中所有表的可用的、使用 COMPUTE STATS 语句手机的统计信息。对查询执行 EXPLAIN 语句,查看该查询采用了哪种连接策略。如果你发现一个查询使用了广播连接,而你通过基准测试知道分割连接更高效,或者相反情况时,在查询上添加提示指定使用的精确的连接机制。参见 Hints 了解详细信息。

统计信息不可用时连接如何处理
假如连接中的一些表的表或列统计信息不可用,Impala 仍然使用可用的那部分信息重新排列表,包含可用统计信息的表放在连接的左侧,按照整体大小和基数降序排列(Tables with statistics are placed on the left side of the join order, in descending order of cost based on overall size and cardinality)。没有统计信息的表被认为大小为 0,也就是说,它们总是放置在连接查询的右侧。

Overriding Join Reordering with STRAIGHT_JOIN
假如因为过时的统计信息或意外的数据分布, Impala 连接查询很低效,你可以通过在 SELECT 关键字之后紧跟着 STRAIGHT_JOIN 关键字来重新排序连接的表,使的 Impala 高效。STRAIGHT_JOIN 关键字关闭 Impala 内部使用的连接子句的重新排序,并根据 查询中 join 子句中列出的顺序优化(The STRAIGHT_JOIN keyword turns off the reordering of join clauses that Impala does internally, and produces a plan that relies on the join clauses being ordered optimally in the query text)。这时,重写查询以便最大的表在最左侧,跟着是次大的,依此类推直到最小的表放在最右侧。

在下面的例子里,基于 BIG 表的子查询产生一个非常小的结果集,但是这个表仍被视为好像它是最大的并放置在连接顺序的第一位。为最后的连接子句使用 STRAIGHT_JOIN 关键字,防止最终的表重新排序,保持它作为最右边表的连接顺序(Using STRAIGHT_JOIN for the last join clause prevents the final table from being reordered, keeping it as the rightmost table in the join order)。


  1. select straight_join x from medium join small join (select * from big where c1 < 10) as big
  2.   where medium.id = small.id and small.id = big.id;
复制代码


Examples of Join Order Optimization下面的例子演示了10亿、2亿、1百万行表之间的连接(这时,表都是未分区的,使用 Parquet 格式)。最小的表是最大的表的一个子集,方便起见在唯一的 ID 列上进行连接。最小的表只包含其他表中列的一个子集。


  1. [localhost:21000] > create table big stored as parquet as select * from raw_data;
  2. +----------------------------+
  3. | summary                    |
  4. +----------------------------+
  5. | Inserted 1000000000 row(s) |
  6. +----------------------------+
  7. Returned 1 row(s) in 671.56s
  8. [localhost:21000] > desc big;
  9. +-----------+---------+---------+
  10. | name      | type    | comment |
  11. +-----------+---------+---------+
  12. | id        | int     |         |
  13. | val       | int     |         |
  14. | zfill     | string  |         |
  15. | name      | string  |         |
  16. | assertion | boolean |         |
  17. +-----------+---------+---------+
  18. Returned 5 row(s) in 0.01s
  19. [localhost:21000] > create table medium stored as parquet as select * from big limit 200 * floor(1e6);
  20. +---------------------------+
  21. | summary                   |
  22. +---------------------------+
  23. | Inserted 200000000 row(s) |
  24. +---------------------------+
  25. Returned 1 row(s) in 138.31s
  26. [localhost:21000] > create table small stored as parquet as select id,val,name from big where assertion = true limit 1 * floor(1e6);
  27. +-------------------------+
  28. | summary                 |
  29. +-------------------------+
  30. | Inserted 1000000 row(s) |
  31. +-------------------------+
  32. Returned 1 row(s) in 6.32s
复制代码
对于任意类型的性能测试,使用 EXPLAIN 语句查看将执行的查询是如何的昂贵(expensive)而不需要实际运行它,并且启用详细的 EXPLAIN 计划包含更详细的性能导向的信息:最有趣的计划行---展示了没有统计信息的连接的表--以黑体突出,Impala 无法正确的估算处理的每个阶段中涉及的行数,通常采用广播连接机制把其中之一的表的完整数据发送到各个节点上(Impala cannot make a good estimate of the number of rows involved at each stage of processing,and is likely to stick with the BROADCAST join mechanism that sends a complete copy of one of the tables to each node)。
  1. [localhost:21000] > set explain_level=verbose;
  2. EXPLAIN_LEVEL set to verbose
  3. [localhost:21000] > explain select count(*) from big join medium where big.id = medium.id;
  4. +----------------------------------------------------------+
  5. | Explain String                                           |
  6. +----------------------------------------------------------+
  7. | Estimated Per-Host Requirements: Memory=2.10GB VCores=2  |
  8. |                                                          |
  9. | PLAN FRAGMENT 0                                          |
  10. |   PARTITION: UNPARTITIONED                               |
  11. |                                                          |
  12. |   6:AGGREGATE (merge finalize)                           |
  13. |   |  output: SUM(COUNT(*))                               |
  14. |   |  cardinality: 1                                      |
  15. |   |  per-host memory: unavailable                        |
  16. |   |  tuple ids: 2                                        |
  17. |   |                                                      |
  18. |   5:EXCHANGE                                             |
  19. |      cardinality: 1                                      |
  20. |      per-host memory: unavailable                        |
  21. |      tuple ids: 2                                        |
  22. |                                                          |
  23. | PLAN FRAGMENT 1                                          |
  24. |   PARTITION: RANDOM                                      |
  25. |                                                          |
  26. |   STREAM DATA SINK                                       |
  27. |     EXCHANGE ID: 5                                       |
  28. |     UNPARTITIONED                                        |
  29. |                                                          |
  30. |   3:AGGREGATE                                            |
  31. |   |  output: COUNT(*)                                    |
  32. |   |  cardinality: 1                                      |
  33. |   |  per-host memory: 10.00MB                            |
  34. |   |  tuple ids: 2                                        |
  35. |   |                                                      |
  36. |   2:HASH JOIN                                            | |   |  join op: INNER JOIN (BROADCAST)                     | |   |  hash predicates:                                    |
  37. |   |    big.id = medium.id                                | |   |  cardinality: unavailable                            | |   |  per-host memory: 2.00GB                             |
  38. |   |  tuple ids: 0 1                                      |
  39. |   |                                                      |
  40. |   |----4:EXCHANGE                                        |
  41. |   |       cardinality: unavailable                       |
  42. |   |       per-host memory: 0B                            |
  43. |   |       tuple ids: 1                                   |
  44. |   |                                                      |
  45. |   0:SCAN HDFS                                            | |      table=join_order.big #partitions=1/1 size=23.12GB   |
  46. |      table stats: unavailable                            |
  47. |      column stats: unavailable                           |
  48. |      cardinality: unavailable                            | |      per-host memory: 88.00MB                            |
  49. |      tuple ids: 0                                        |
  50. |                                                          |
  51. | PLAN FRAGMENT 2                                          |
  52. |   PARTITION: RANDOM                                      |
  53. |                                                          |
  54. |   STREAM DATA SINK                                       |
  55. |     EXCHANGE ID: 4                                       |
  56. |     UNPARTITIONED                                        |
  57. |                                                          |
  58. |   1:SCAN HDFS                                            | |      table=join_order.medium #partitions=1/1 size=4.62GB |
  59. |      table stats: unavailable                            |
  60. |      column stats: unavailable                           |
  61. |      cardinality: unavailable                            | |      per-host memory: 88.00MB                            |
  62. |      tuple ids: 1                                        |
  63. +----------------------------------------------------------+
  64. Returned 64 row(s) in 0.04s
复制代码
采集所有表的统计信息很简单,在每一个表上执行 COMPUTE STATS 语句:
  1. [localhost:21000] > compute stats small;
  2. +-----------------------------------------+
  3. | summary                                 |
  4. +-----------------------------------------+
  5. | Updated 1 partition(s) and 3 column(s). |
  6. +-----------------------------------------+
  7. Returned 1 row(s) in 4.26s
  8. [localhost:21000] > compute stats medium;
  9. +-----------------------------------------+
  10. | summary                                 |
  11. +-----------------------------------------+
  12. | Updated 1 partition(s) and 5 column(s). |
  13. +-----------------------------------------+
  14. Returned 1 row(s) in 42.11s
  15. [localhost:21000] > compute stats big;
  16. +-----------------------------------------+
  17. | summary                                 |
  18. +-----------------------------------------+
  19. | Updated 1 partition(s) and 5 column(s). |
  20. +-----------------------------------------+
  21. Returned 1 row(s) in 165.44s
复制代码
有了统计信息,Impala 可以选择更有效的连接顺序而不是按照查询中从左到右各个表的顺序,并且可以基于表的大小和行数选择广播连接或分割连接策略:
  1. [localhost:21000] > explain select count(*) from medium join big where big.id = medium.id;
  2. Query: explain select count(*) from medium join big where big.id = medium.id
  3. +-----------------------------------------------------------+
  4. | Explain String                                            |
  5. +-----------------------------------------------------------+
  6. | Estimated Per-Host Requirements: Memory=937.23MB VCores=2 |
  7. |                                                           |
  8. | PLAN FRAGMENT 0                                           |
  9. |   PARTITION: UNPARTITIONED                                |
  10. |                                                           |
  11. |   6:AGGREGATE (merge finalize)                            |
  12. |   |  output: SUM(COUNT(*))                                |
  13. |   |  cardinality: 1                                       |
  14. |   |  per-host memory: unavailable                         |
  15. |   |  tuple ids: 2                                         |
  16. |   |                                                       |
  17. |   5:EXCHANGE                                              |
  18. |      cardinality: 1                                       |
  19. |      per-host memory: unavailable                         |
  20. |      tuple ids: 2                                         |
  21. |                                                           |
  22. | PLAN FRAGMENT 1                                           |
  23. |   PARTITION: RANDOM                                       |
  24. |                                                           |
  25. |   STREAM DATA SINK                                        |
  26. |     EXCHANGE ID: 5                                        |
  27. |     UNPARTITIONED                                         |
  28. |                                                           |
  29. |   3:AGGREGATE                                             |
  30. |   |  output: COUNT(*)                                     |
  31. |   |  cardinality: 1                                       |
  32. |   |  per-host memory: 10.00MB                             |
  33. |   |  tuple ids: 2                                         |
  34. |   |                                                       |
  35. |   2:HASH JOIN                                             |
  36. |   |  join op: INNER JOIN (BROADCAST)                      |
  37. |   |  hash predicates:                                     |
  38. |   |    big.id = medium.id                                 |
  39. |   |  cardinality: 1443004441                              |
  40. |   |  per-host memory: 839.23MB                            |
  41. |   |  tuple ids: 1 0                                       |
  42. |   |                                                       |
  43. |   |----4:EXCHANGE                                         |
  44. |   |       cardinality: 200000000                          |
  45. |   |       per-host memory: 0B                             |
  46. |   |       tuple ids: 0                                    |
  47. |   |                                                       |
  48. |   1:SCAN HDFS                                             |
  49. |      table=join_order.big #partitions=1/1 size=23.12GB    |
  50. |      table stats: 1000000000 rows total                   |
  51. |      column stats: all                                    |
  52. |      cardinality: 1000000000                              |
  53. |      per-host memory: 88.00MB                             |
  54. |      tuple ids: 1                                         |
  55. |                                                           |
  56. | PLAN FRAGMENT 2                                           |
  57. |   PARTITION: RANDOM                                       |
  58. |                                                           |
  59. |   STREAM DATA SINK                                        |
  60. |     EXCHANGE ID: 4                                        |
  61. |     UNPARTITIONED                                         |
  62. |                                                           |
  63. |   0:SCAN HDFS                                             |
  64. |      table=join_order.medium #partitions=1/1 size=4.62GB  |
  65. |      table stats: 200000000 rows total                    |
  66. |      column stats: all                                    |
  67. |      cardinality: 200000000                               |
  68. |      per-host memory: 88.00MB                             |
  69. |      tuple ids: 0                                         |
  70. +-----------------------------------------------------------+
  71. Returned 64 row(s) in 0.04s
  72. [localhost:21000] > explain select count(*) from small join big where big.id = small.id;
  73. Query: explain select count(*) from small join big where big.id = small.id
  74. +-----------------------------------------------------------+
  75. | Explain String                                            |
  76. +-----------------------------------------------------------+
  77. | Estimated Per-Host Requirements: Memory=101.15MB VCores=2 |
  78. |                                                           |
  79. | PLAN FRAGMENT 0                                           |
  80. |   PARTITION: UNPARTITIONED                                |
  81. |                                                           |
  82. |   6:AGGREGATE (merge finalize)                            |
  83. |   |  output: SUM(COUNT(*))                                |
  84. |   |  cardinality: 1                                       |
  85. |   |  per-host memory: unavailable                         |
  86. |   |  tuple ids: 2                                         |
  87. |   |                                                       |
  88. |   5:EXCHANGE                                              |
  89. |      cardinality: 1                                       |
  90. |      per-host memory: unavailable                         |
  91. |      tuple ids: 2                                         |
  92. |                                                           |
  93. | PLAN FRAGMENT 1                                           |
  94. |   PARTITION: RANDOM                                       |
  95. |                                                           |
  96. |   STREAM DATA SINK                                        |
  97. |     EXCHANGE ID: 5                                        |
  98. |     UNPARTITIONED                                         |
  99. |                                                           |
  100. |   3:AGGREGATE                                             |
  101. |   |  output: COUNT(*)                                     |
  102. |   |  cardinality: 1                                       |
  103. |   |  per-host memory: 10.00MB                             |
  104. |   |  tuple ids: 2                                         |
  105. |   |                                                       |
  106. |   2:HASH JOIN                                             |
  107. |   |  join op: INNER JOIN (BROADCAST)                      |
  108. |   |  hash predicates:                                     |
  109. |   |    big.id = small.id                                  |
  110. |   |  cardinality: 1000000000                              |
  111. |   |  per-host memory: 3.15MB                              |
  112. |   |  tuple ids: 1 0                                       |
  113. |   |                                                       |
  114. |   |----4:EXCHANGE                                         |
  115. |   |       cardinality: 1000000                            |
  116. |   |       per-host memory: 0B                             |
  117. |   |       tuple ids: 0                                    |
  118. |   |                                                       |
  119. |   1:SCAN HDFS                                             |
  120. |      table=join_order.big #partitions=1/1 size=23.12GB    |
  121. |      table stats: 1000000000 rows total                   |
  122. |      column stats: all                                    |
  123. |      cardinality: 1000000000                              |
  124. |      per-host memory: 88.00MB                             |
  125. |      tuple ids: 1                                         |
  126. |                                                           |
  127. | PLAN FRAGMENT 2                                           |
  128. |   PARTITION: RANDOM                                       |
  129. |                                                           |
  130. |   STREAM DATA SINK                                        |
  131. |     EXCHANGE ID: 4                                        |
  132. |     UNPARTITIONED                                         |
  133. |                                                           |
  134. |   0:SCAN HDFS                                             |
  135. |      table=join_order.small #partitions=1/1 size=17.93MB  |
  136. |      table stats: 1000000 rows total                      |
  137. |      column stats: all                                    |
  138. |      cardinality: 1000000                                 |
  139. |      per-host memory: 32.00MB                             |
  140. |      tuple ids: 0                                         |
  141. +-----------------------------------------------------------+
  142. Returned 64 row(s) in 0.03s
复制代码
当类似这些的查询实际运行时,执行时间是相对固定的,不管查询语句中表的顺序如何。下面的例子使用了唯一的 ID 列和包含重复值的 VAL 列:
  1. [localhost:21000] > select count(*) from big join small on (big.id = small.id);
  2. Query: select count(*) from big join small on (big.id = small.id)
  3. +----------+
  4. | count(*) |
  5. +----------+
  6. | 1000000  |
  7. +----------+
  8. Returned 1 row(s) in 21.68s
  9. [localhost:21000] > select count(*) from small join big on (big.id = small.id);
  10. Query: select count(*) from small join big on (big.id = small.id)
  11. +----------+
  12. | count(*) |
  13. +----------+
  14. | 1000000  |
  15. +----------+
  16. Returned 1 row(s) in 20.45s
  17. [localhost:21000] > select count(*) from big join small on (big.val = small.val);
  18. +------------+
  19. | count(*)   |
  20. +------------+
  21. | 2000948962 |
  22. +------------+
  23. Returned 1 row(s) in 108.85s
  24. [localhost:21000] > select count(*) from small join big on (big.val = small.val);
  25. +------------+
  26. | count(*)   |
  27. +------------+
  28. | 2000948962 |
  29. +------------+
  30. Returned 1 row(s) in 100.76s
复制代码
Note: 当检测连接查询的性能和连接顺序优化的有效性时,请确保查询涉及到足够的数据和集群资源,以便能在查询计划中看出不同。例如,只有几兆大小的单个数据文件将会存放在一个 HDFS 块里,并只被单个节点处理。同样的,假如你使用单节点或两个节点的集群,广播连接和分割连接策略的效率可能没什么分别。


Impala 如何使用统计信息进行查询优化
当统计信息可用时,Impala 可以更好的优化复杂的或多表查询,可以更好地理解数据量和值的分布,并使用这些信息帮助查询并行处理和分布负载。下面的章节描述了 Impala 可以使用的统计信息的分类,以及如何产生这些信息并保持最新。

原来 Impala 依靠 Hive 采集统计信息的机制,通过 Hive ANALYZE TABLE 语句初始化一个 MapReduce 作业进行。为了更好的性能、用户友好性和可靠性, 在 1.2.1 之后,Impala 实现了自己的 COMPUTE STATS 语句,以及相关的 SHOW TABLE STATS 和 SHOW COLUMN STATS 语句。


表统计信息
当 metastore 数据库中的元数据可用时,Impala 查询计划器可以使用整个表和分区的统计信息。这些元数据用于本表的某些优化,并和列统计信息组合用于其他优化。
当向表或分区加载数据加载数据后,使用以下技术之一采集表的统计信息:
  • 在 Impala 中执行 COMPUTE STATS 语句。这一在 Impala 1.2.2 新引入的语句是首选方法,因为:
    • 它在单个操作中采集表、表的所有分区和所有列的统计信息
    • 它不依赖于任意特殊的 Hive 设置、 metastore 配置、或单独的数据库来存放统计信息
    • 它使用 Impala 查询基础架构来计算行数、不同值个数等等,通常比用 Hive ANALYZE TABLE statement.
  • 当 Hive 中设置 hive.stats.autogather 为启用时,通过 Hive INSERT OVERWRITE 语句加载数据
  • 为整个表或特定分区在 Hive 中执行 ANALYZE TABLE 语句:
  1. ANALYZE TABLEtablename[PARTITION(partcol1[=val1],partcol2[=val2], ...)] COMPUTE STATISTICS [NOSCAN];
复制代码
例如,为非分区表采集统计信息:
  1. ANALYZE TABLE customer COMPUTE STATISTICS;
复制代码
为以 state 和 city 分区列的分区表 store 表采集所有分区的统计信息:
  1. ANALYZE TABLE store PARTITION(s_state, s_county) COMPUTE STATISTICS;
复制代码
只采集分区表 store 中 California 分区的统计信息:
  1. ANALYZE TABLE store PARTITION(s_state='CA', s_county) COMPUTE STATISTICS;
复制代码
使用 SHOW TABLE STATS table_name 语句,查看表的统计信息是否可用,以及统计信息的详细内容。参考 SHOW Statement 了解详细信息。
假如你使用基于 Hive 的方法采集统计信息,参见 the Hive wiki 了解关于 Hive 的配置要求。 Cloudera 推荐使用 Impala COMPUTE STATS 语句以避免 Hive 采集统计信息程序潜在的配置和可扩展性方面的问题。

列统计信息
当 metastore 数据库中的元数据可用时,Impala 查询计划器可以使用单个列的统计信息。这一技术对于比较连接查询中所有表的连接列,以帮助评估查询中每一个表将返回多少行最有价值。目前 Impala 自身不会自动创建这些元数据。使用 Hive 中的 ANALYZE TABLE 语句收集这些统计信息(无论表是在 Impala 还是 Hive 中创建的,这一语句都可以正常工作)。

Note:Impala 中列统计信息很重要,但是对于应用表,你也同样需要表的统计信息,像在 表统计信息 中描述的那样。假如你使用 Impala COMPUTE STATS 语句,表和表中所有列的统计信息都会自动同时收集。


对于特定的一组列,使用 SHOW COLUMN STATS table_name 语句检查列统计信息是否可用,或检查针对引用这系列的表的查询的扩展的 EXPLAIN 输出。参见
SHOW 语句EXPLAIN 语句了解详细信息。

通过 ALTER TABLE 手工设置统计信息
所有统计信息中最关键的部分是表(未分区的表)或分区(分区表)中的行数。COMPUTE STATS 语句总是采集所有列的统计信息以及整个表的统计信息。假如在添加了一个分区或插入数据之后,进行完整的 COMPUTE STATS 操作实际不可行时,或者当行数不同时,可以预见 Impala 将产生更好的执行计划时,你可以通过 ALTER TABLE 语句手工设置行数:

  1. create table analysis_data stored as parquet as select * from raw_data;
  2. Inserted 1000000000 rows in 181.98s
  3. compute stats analysis_data;
  4. insert into analysis_data select * from smaller_table_we_forgot_before;
  5. Inserted 1000000 rows in 15.32s
  6. -- 现在表里共有 1001000000 行。我们可以更新统计信息中的这一个数据点
  7. alter table analysis_data set tblproperties('numRows'='1001000000');
复制代码


对于分区表,同时更新每一个分区的行数和整个表的行数:

  1. -- 如果原来表中包含 1000000 行,我们新添加了一个分区
  2. -- 修改该分区和整个表的 numRows 属性
  3. alter table partitioned_data partition(year=2009, month=4) set tblproperties ('numRows'='30000');
  4. alter table partitioned_data set tblproperties ('numRows'='1030000');
复制代码
实际上,COMPUTE STATS 语句已经够快了,这一技术是不必要的。这一方法最大的价值就是可以调整 numRows 值的大小来产生理想的连接顺序从而解决性能问题(It is most useful as a workaround for in case of performance issues where you might adjust the numRowsvalue higher or lower to produce the ideal join order)。

下一篇:Impala性能调整二



已有(1)人评论

跳转到指定楼层
韩克拉玛寒 发表于 2014-10-8 09:18:26
谢谢群主的分享
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条