对于一个工程师或者分析师来说,如何查询和分析TB/PB级别的数据是在大数据时代不 可回避的问题。SQL on Hadoop就成为了一个重要的工具。为什么非要把SQL放到Hadoop上? SQL易于使用;那为什么非得基于Hadoop呢?Hadoop架构具备很强的鲁棒性和可扩展性。本文从技术架构和最新进展两个角度分析一下各种SQL on Hadoop产品的优缺点和适用范围:Hive、Tez/Stinger、Impala、Shark/Spark、Phoenix、 Hdapt/HadoopDB、Hawq/Greenplum。 在互联网企业和有大数据处理需求的传统企业中,基于Hadoop构建的数据仓库的数据来源主要有以下几个: 通过Flume/Scribe/Chukwa这样的日志收集和分析系统把来自Apache/Nginx的日志收集到HDFS上,然后通过Hive查询。 通过Sqoop这样的工具把用户和业务维度数据(一般存储在Oracle/MySQL中)定期导入Hive,那么OLTP数据就有了一个用于OLAP的副本了。 通过ETL工具从其他外部DW数据源里导入的数据。 目前所有的SQL on Hadoop产品其实都是在某个或者某些特定领域内适合的,没有silver bullet。像当年Oracle/Teradata这样的满足几乎所有企业级应用的产品在大数据时代是不现实的。所以每一种SQL on Hadoop产品都在尽量满足某一类应用的特征。典型需求: interactive query (ms~3min) data analyst,reporting query (3min~20min) data mining,modeling and large ETL (20 min ~ hr ~ day) 机器学习需求(通过MapReduce/MPI/Spark等计算模型来满足) Hive Hive是目前互联网企业中处理大数据、构建数据仓库最常用的解决方案,甚至在很多公司部署了Hadoop集群不是为了跑原生MapReduce程序,而全用来跑Hive SQL的查询任务。 对于有很多data scientist和analyst的公司,会有很多相同表的查询需求。那么显然每个人都从Hive中查数据速度既慢又浪费资源。如果能把经常访问的数据放到内存组成的集群中供用户查询那样效率就会高很多。Facebook针对这一需求开发了Presto,一个把热数据放到内存中供SQL查询的系统。这个设计思路跟Impala和Stinger非常类似了。使用Presto进行简单查询只需要几百毫秒,即使是非常复杂的查询,也只需数分钟即可完成,它在内存中运行,并且不会向磁盘写入。Facebook有超过850名工程师每天用它来扫描超过320TB的数据,满足了80%的ad-hoc查询需求。 目前Hive的主要缺点: data shuffle时网络瓶颈,Reduce要等Map结束才能开始,不能高效利用网络带宽。 一般一个SQL都会解析成多个MR job,Hadoop每次Job输出都直接写HDFS,大量磁盘IO导致性能比较差。 每次执行Job都要启动Task,花费很多时间,无法做到实时。 由于把SQL转化成MapReduce job时,map、shuffle和reduce所负责执行的SQL解析出得功能不同。那么就有Map->MapReduce或者 MapReduce->Reduce这样的需求,这样可以降低写HDFS的IO数量,从而提高性能。但是目前MapReduce框架还不支持 M->MR或者MR->R这样的任务执行。 目前Hive主要的改进(主要是体现在 Hive 0.11版本上): 1. 同一条hive sql解析出的多个MR任务的合并。由Hive解析出来的MR jobs中有非常多的Map->MapReduce类型的job,可以考虑把这个过程合并成一个MRjob。 https://issues.apache.org/jira/browse/HIVE-3952 2. Hive query optimizer(查询优化器是Hive需要持续不断优化的一个topic) 例如JOIN顺序的优化,就是原来一个大表和多个小表在不同column匹配的条件下JOIN需要解析成多个Map join + MR job,现在可以合并成一个MR job。 这个改进方向要做的就是用户不用给太多的hint,hive可以自己根据表的大小、行数等,自动选择最快的join的方法(小表能装进内存的话 就用Map join,Map join能和其他MR job合并的就合并)。这个思路跟cost-based query optimizer有点类似了,用户写出来的SQL在翻译成执行计划之前要计算那种执行方式和JOIN顺序效率更高。 3. ORCFile ORCFile是一种列式存储的文件,对于分析型应用来说列存有非常大的优势。 原来的RCFile中把每一列看成binary blob,没有任何语义,所以只能用通用的zlib,LZO,Snappy等压缩方法。ORCFile能够获取每一列的类型(int还是string), 那么就可以使用诸如dictionary encoding, bit packing, delta encoding, run-length encoding等轻量级的压缩技术。这种压缩技术的优势有两点:一是提高压缩率;二是能够起到过滤无关数据的效果。 Predicate Pushdown:原来的Hive是把所有的数据都读到内存中,然后再判断哪些是符合查询需求的。在ORCFile中数据以Stripe为单元读取到内 存,那么ORCFile的RecordReader会根据Stripe的元数据(Index Data,常驻内存)判断该Stripe是否满足这个查询的需求,如果不满足直接略过不读,从而节省了IO。 通过对ORCFile的上述分析,我想大家已经看到了brighthouse的影子了吧。都是把列数据相应的索引、统计数据、词典等放到内存中参与查询条件的过滤,如果不符合直接略过不读,大量节省IO。 4. HiveServer2的Security和Concurrency特性 HiveServer2能够支持并发客户端(JDBC/ODBC)的访问。 Cloudera还搞了个Sentry用于Hadoop生态系统的的安全性和授权管理方面的工作。这两个特点是企业级应用Hadoop/Hive主要关心的。 5. HCatalog Hadoop的统一元数据管理平台 目前Hive存储的表格元数据和HDFS存储的表格数据之间在schema上没有一致性保证,也就是得靠管理员来保证。目前Hive对列的改变 只会修改 Hive 的元数据,而不会改变实际数据。比如你要添加一个column,那么你用Hive命令行只是修改了了Hive元数据,没有修改HDFS上存储的格式。还得 通过修改导入HDFS的程序来改变HDFS上存储的文件的格式。Hadoop系统目前对表的处理是’schema on read’,有了HCatlog就可以做到EDW的’schema on write’。 6. Windowing and Analytics Functions的支持。 Tez/Stinger Tez是一种新的基于YARN的DAG计算模型,主要是为了优化Hive而设计的。目前Tez/Stinger主要是Hortonworks在 搞,他们希望以后把Hive SQL解析成能够在Tez上跑的DAG而不是MapReduce,从而解决计算实时性的问题。Tez的主要特点有: 底层执行引擎不再使用MR,而是使用基于YARN的更加通用的DAG执行引擎,MR是高度抽象的Map和Reduce两个操作,而Tez则是在这两个操作的基础上提供了更丰富的接口。把Map具体到Input、 Processor、 Sort、Merge、Output,而Reduce也具体化成Input、Shuffle、Sort、Merge、Processor、 Output。其实这个跟Spark有点类似了,都是提供更丰富的可操作单元给用户。 传统的Reduce只能输出到HDFS,而Tez的Reduce Processor能够输出给下一个Reduce Processor作为输入。 Hot table也放到内存中cache起来 Tez service:预启动container和container重用,降低了每次Query执行计划生成之后Task启动的时间,从而提高实时性。 Tez本身只是YARN框架下得一个library,无需部署。只需指定mapreduce.framework.name=yarn-tez Tez/Stinger还有一个最重要的feature : Vectorized Query Execution ( 该feature在HDP 2.0 GA中会提供)。 目前Hive中一行一行的处理数据,然后调用lazy deserialization解析出该列的Java对象,显然会严重影响效率。Vectorized Query Execution把多行数据同时读取并处理(基本的比较或者数值计算),降低了函数调用的次数,提高了CPU利用率和cache命中率。 Hive->Tez/Stinger未来工作的主要方向:Cost-based optimizer,基于统计选择执行策略,例如多表JOIN时按照怎样的顺序执行效率最高。统计执行过程中每个中间表的Row/Column等数目,从而决定启动多少个MR执行。 Impala Impala可以看成是Google Dremel架构和MPP (Massively Parallel Processing)结构的混合体,目前主要是Cloudera在主导这个项目。 优点: 目前支持两种类型的JOIN:broadcast join和partition join。对于大表JOIN时由于内存限制,装不下时就要dump部分数据到磁盘,那样就会比较慢。 Impala各个任务之间传输数据采用的是push的方式(MR采用的是pull的方式),也就是上游任务计算完就会push到下游,这样能够分散网络压力,提高job执行效率。 Parquet列存格式,同时能够处理嵌套数据。通过嵌套数据以及扩展的SQL查询语义,在某些特定的场景上避开了JOIN从而解决了一部分性能的bottleneck。 Cloudera Manager 4.6以后会有slow query的分析功能。 Runtime Code Generation 缺点: Impala不会按照group by的列排序 目前不支持UDF,Impala 1.2即将支持Hive UDFs和Impala native UDFs and UDAs 不支持像Hive的Serializer/Deserializer,从而使得它做从非结构化到结构化数据的ETL工作比较麻烦。所以本质上讲Impala适合MR配合做ETL之后的查询工作。 由于Impala的设计初衷是short query,所以不支持fault tolerance。如果参与查询的某个node出错,Impala将会丢弃本次查询。 安全方面的支持还比较差。impalad之间传输的数据没有加密,不支持表或者列级别的授权。 每个PlanFragment执行尽量并行化,但是有的时候并不是很容易。例如Hash Join需要等到其中一个表完全Scan结束才能开始。 虽然有这么多缺点,但是很多公司还是开始尝试Impala了。以百度为 例,百度尝试把MySQL接入Impala的后端作为存储引擎,同时实现相应操作对应的PlanFragment,那么用户来的query还是按照原来的 解析方法解析成各种PlanFragment,然后直接调度到对应的节点(HDFS DataNode/HBaseRegionServer/MySQL)上执行。会把某些源数据或者中间数据放到MySQL中,用户的query涉及到使用 这部分数据时直接去MySQL里面拿。 Shark/Spark 由于数据能放到内存尽量放到内存,使用内存非常aggressive。优点是做JOIN时会比较快,缺点是占用内存太大,且自行管理内存,占用内存后不会释放。 由于Shark借用了Hive的codebase,所以在SQL,SerDes,UDF支持方面和Hive是完全兼容的。 支持从short query到long time query等不同粒度的查询,所以具有fault tolerance特性。 性能:特别简单的select…where查询,shark性能的提升不明显(因为hive也不怎么费时间)。但是如果查询比较复杂 select…join…where…group by,hive的job数目会比较多,读写HDFS次数增多,时间自然会变长。当内存还足够大的时候shark性能是最好的,如果内存不够装下所有的数据 时性能会下降,但还是会比Hive好很多。 Phoenix Salesforce开源的基于HBase的SQL查询系统。基本原理是将一个对于HBase client来说比较复杂的查询转换成一系列Region Scan,结合coprocessor和custom filter在多台Region Server上进行并行查询,汇总各个Scan结果。种种迹象表明,Phoenix应该不是个优化的OLAP系统,更像是一个用于简单单表查询,过滤,排 序,检索的OLTP系统。 优点: HBase默认存储的数据类型都是字符串,但Phoenix支持更多的数据类型。 使用JDBC操作数据,而不是HBase client API 在RegionServer端通过coprocessor过滤where条件,执行aggregation函数。比较Hive on HBase,Impala on HBase和Phoenix这三者的架构是相似的,不同点就是Hive on HBase和Impala on HBase都没有把coprocessor利用好,都是通过HBase client API把数据读到他们自己进程的内存之后才进行的filter, aggregation等操作。 从查询的角度来看HBase的column主要分为两类:primary key(row key column)和other columns。主要的不同是row key column能够利用Region Server的index, filter, sort等特性,而other columns没有这些特性,只能通过二级索引辅助做一些优化。Phoenix能够在HBase上创建二级索引用于优化条件查询(目前只支持在 static table上建二级索引,一个更通用的HBase二级索引实现方法参考 https://github.com/Huawei-Hadoop/hindex)。 如果是row key column上的IN/OR/LIKE条件,可以通过Region Server的skip scan filter优化。 Dynamic columns支持。 AutoCommit=false时(默认是false)把所有操作先缓存在客户端,只有你显示commit时才一次批量提交到HBase,SQL解析优化全是在客户端做,这个有点事务的意思。 缺点: 不支持JOIN,考虑到HBase的设计初衷是尽量用冗余数据减少复杂的JOIN操作,实际上可以把相关数据都放在同一个表里,而不需要为了减少数据冗余,拆分到多个表中,所以很大程度也可以认为这不是一个缺点。 从架构上看也仅是把SQL转成HBase Client的API和coprocessor的调用,而且coprocessor还不适合大规模数据的传输,所以如果中间结果的数据量还是比较大的话性能问题还是很明显的。 这个缺点是所有的基于HBase的SQL系统都有的(包括Hive on HBase和Impala on HBase)。不管什么请求到HBase Region Server这边都得通过RegionScanner,这个接口不是面向OLAP型应用优化的存储文件读取接口。例如RegionScanner的实现里 好多条件比较,是不利于全表扫描的。 还有个问题就是coprocessor的问题了,由于coprocessor和HBase Region Server是在一个JVM里面,所以当coprocessor计算逻辑非常复杂,中间结果数据量很大的时候会占用大量内存。同时coprocessor 不是流式地读取数据,某些节点数据积累过多也会造成内存不够用的问题。 RoadMap: JOIN支持,虽然有点不符合设计初衷,但是大家都支持,就我不支持,太过时了吧。 Transaction支持,通过参考 https://github.com/yahoo/omid的方法。 Online Schema Evolution,动态改变column的类型,rename等。 Hadapt/HadoopDB 架构和Hive相似,底层存储引擎有两种:HDFS和RDBMS(PostgreSQL),一个DataNode节点上有一个RDBMS节点。 提供两种接口:SQL和MR,SQL也是解析成MR job来执行的,所以总的来说执行引擎都是MR。 把多个MR任务,转换成单node上的SQL+一个MR(data shuffle),这个跟水平压缩,垂直压缩类似,尽量减少SQL解析出的MR task个数,减少任务之间写HDFS的IO数据量。把一个SQL拆解成两部分:适合SQL做的用单机SQL,不适合的用MR(data shuffle) 和Hive的不同点在于Hive只能操控HDFS上的数据,而Hadapt可以操控HDFS和RDBMS两种数据来源。对于RDBMS这个数据 源来说,数据被预先load到分布式的RDBMS节点中,有统一的Catalog管理所有RDBMS中的数据。例如Map中的有些执行逻辑直接通过一个在 RDBMS上执行的SQL来获得(修改InputFormat),然后使用MR来做JOIN/Group By。而且如果在数据被load到分布式PG节点上时分布情况正好符合group by/order by的条件,那么还省得通过MR的shuffle来做了。 Hadapt的本质还是把SQL解析成MR任务来做,所以Hive的有些缺点(启动时间长,JOIN效率较低)它也是具有的。还有如果想要join/group by/order by能够在RDBMS数据源之间高效执行,还得考虑数据预分布的问题。 在执行多个查询的时候,后面的查询能够利用前面查询的查询结果(有点类似于数据仓库中的物化视图的概念),从而可以提高查询的性能。 现在企业级应用大多使用的方案是Hadoop+MPP的方式,即通过Hadoop批处理非结构化数据(进行ETL操作)然后通过 connector导入MPP进行结构化数据的查询操作。但是这只是临时的替代方案,Hadapt说invisible loading才是最合理的,这样企业就有了一个统一分析平台。 Hawq 原来GPDB中的存储是本地磁盘,现在改成HDFS,原来GPDB的单节点的RDBMS只充当执行引擎的功能,不再充当存储引擎功能。 查询执行通过GPDB的并行执行引擎(不再使用MR),每次查询开始把数据从HDFS中导入到GPDB,执行过程中通过内存交换数据而非MR那样每次任务结束都写磁盘。 GP特有的cost-based parallel query optimizer and planner是它的一大优势,也是目前其他大多数的产品中没有的,它能够帮用户选出该SQL最高效的执行顺序。 使用GPDB充当执行引擎的好处:标准SQL兼容;支持ACID事务;JDBC/ODBC支持;JOIN顺序优化和索引支持(查询优化器);支持行/列两种存储格式。 GPXF使得Hawq能够读取存储在HDFS上的任何格式的数据以及存储在其他文件系统和设备中的数据。 底层的HDFS需要支持trancate语义和native C interface。 支持In-Database analytics ( http://madlib.net/ ): 性能相关: Scott Yara(Greenplum老大)公开承认Hawq比pure GPDB要慢。这么做的目的无非就是更好的利用HDFS的可扩展性,统一存储管理。 和其他SQL on Hadoop产品的性能对比方面,Hawq在group by和join操作上与其他方案相比优势明显,前提是数据量不是特别大。(是不是因为数据导入的时候partition做的好呢,是不是拿load的时间 换group by/join的时间呢?) 总之,目前在SQL on Hadoop领域普遍比较薄弱的环节是: 1. workload management and query optimization多个表的JOIN如何执行,例如3个表的JOIN会有6种执行策略,那么哪一种才是效率最高的呢。显然要通过计算每种执行顺序的开销来获得。在传统数据库或者数据仓库领域都有非常好的查询优化器,而在分布式系统中该如何衡量这些指标(磁盘IO,网络带宽,内存)与最后查询效率之间的关系是个需要认真研究的问题。 2. 关联子查询correlated sub-queries还是没有谁能够实现。在TPC-H中又很多关联子查询的例子,但是现在的SQL on Hadoop产品都不支持。听Impala的人说,他们客户对这个的需求不是很强烈,大部分关联子查询可以转化成JOIN操作。但是目前的商业产品像 Hawq是支持关联子查询的。 除了上面主要讨论的开源产品以外,大数据分析领域还有很多商业产品。这些商业产品可以分为两类:一类是面向企业级应用的、以卖license或软硬件一体机形式出售的Teradata/Aster Data, HP/Vertica, SAP/HANA,IBM/BigSQL, Oracle和Microsoft也有类似的产品;另一类是利用大规模云计算基础设施,提供的数据分析服务的Google/BigQuery(典型的Analysis as a Service)和Amazon/Redshif。
|