本帖最后由 pig2 于 2018-11-5 12:17 编辑
问题导读
1.什么是Hive hooks?
2.什么是数据湖?
3.Hive钩子有哪些种类?
4.如何实现Hive钩子?
关注最新经典文章,欢迎关注公众号
今天讲的是Hive hooks(钩子).先简单普及下什么是Hooks。
########################################
内容补充
通常,Hook是一种在处理过程中拦截事件,消息或函数调用的机制。 Hive hooks是绑定到了Hive内部的工作机制,无需重新编译Hive。从这个意义上讲,提供了使用hive扩展和集成外部功能的能力。换句话说,Hive hadoop可用于在查询处理的各个步骤中运行/注入一些代码。
更多内容:
根据钩子的类型,它可以在查询处理期间的不同点调用:
Pre-execution hooks-在执行引擎执行查询之前,将调用Pre-execution hooks。请注意,这个目的是此时已经为Hive准备了一个优化的查询计划。
Post-execution hooks -在查询执行完成之后以及将结果返回给用户之前,将调用Post-execution hooks 。
Failure-execution hooks -当查询执行失败时,将调用Failure-execution hooks 。
Pre-driver-run 和post-driver-run hooks-在driver执行查询之前和之后调用Pre-driver-run 和post-driver-run hooks。
Pre-semantic-analyzer 和 Post-semantic-analyzer hooks-在Hive在查询字符串上运行语义分析器之前和之后调用Pre-semantic-analyzer 和Post-semantic-analyzer hooks。
什么是数据湖:
数据湖是一种在系统或存储库中以自然格式存储数据的方法,它有助于以各种模式和结构形式配置数据,通常是对象块或文件。数据湖的主要思想是对企业中的所有数据进行统一存储,从原始数据(这意味着源系统数据的精确副本)转换为用于报告、可视化、分析和机器学习等各种任务的转换数据。湖中的数据包括结构化数据从关系数据库(行和列),半结构化数据(CSV、XML、JSON的日志),非结构化数据(电子邮件,文档,PDF)和二进制数据(图像、音频、视频)从而形成一个集中式数据存储容纳所有形式的数据。
数据湖的核心思想是把不同结构的数据统一存储,使不同数据有一致的存储方式,在使用时方便连接,真正解决数据集成问题。
数据湖泊和数据仓库的区别,主要就是数据仓库的数据进入这个池之前是预先分类的,这可以指导其后面如何进行数据的分析。但在大数据时代,这些都是素材而已,你根本不知道以后如何用它。也就是数据湖泊给后面的数据分析带来了更大的弹性。因此,这个放大数据的仓库,专家建议叫数据湖泊,以区别于数据仓库。
########################################
下面我们开始讲Hive Hooks优化数据湖
关于数据的数据
数据湖泊(lakes )和数据沼泽(swamps )之间的重要区别在于,组织良好的数据可以形成高效的湖泊​​,而沼泽只是数据过度复制或被用户孤立的数据。获取有关如何跨组织使用生产数据的信息不仅有利于构建组织良好的数据湖,而且还有助于数据工程师微调数据管道或数据本身。
要了解数据的消耗方式,我们需要找出一些基本问题的答案,例如:
经常访问哪些数据集(表/视图/数据库)?
查询何时运行最频繁?
哪些用户或应用程序正在大量使用这些资源?
什么类型的查询经常运行?
访问最多的对象可以轻松地受益于压缩,列式文件格式或数据分解等优化。可以为利用资源的应用程序或用户分配单独的队列,以平衡群集上的负载。群集资源可以在时间范围内按比例放大,此时大多数查询主要用于满足SLA并在低使用率期间按比例缩小以节省成本。
Hive Hooks是回答上述问题!
钩子(Hooks)
钩子是一种允许修改程序行为的机制。 它是一种拦截应用程序中的函数调用,消息或事件的技术。 Hive提供了许多不同类型的钩子,上文补充内容已经列出来。
可以在特定事件中调用每种类型的挂钩,并且可以根据用例自定义以执行不同的操作。 例如,在执行物理查询计划之前调用预执行挂钩,并在向job.xml提交查询以编辑敏感信息之前调用redactor挂钩。 Apache Atlas拥有最流行的Hive钩子实现之一,它可以监听Hive中的创建/更新/删除操作,并通过Kafka通知更新Atlas中的元数据。
Implementation
Pre-execution hooks可以由ExecuteWithHookContext接口创建实现。这是一个空的接口,简单地调用run方法和HookContext。HookContext 有很多关于查询、HIVE实例和用户的信息。可以很容易地利用信息来检测数据湖如何被其用户使用。
-
- public HookContext(QueryPlan queryPlan, QueryState queryState,
- Map<String, ContentSummary> inputPathToContentSummary, String userName, String ipAddress,
- String hiveInstanceAddress, String operationId, String sessionId, String threadId,
- boolean isHiveServerQuery, PerfLogger perfLogger, QueryInfo queryInfo) throws Exception {
复制代码
实现HookContext的任何钩子得到查询计划(QueIGrPy)。在QueryPlan的hood下面,有许多getters 被用来收集关于查询的信息。举几个例子:
- getQueryProperties - 获取有关查询的详细信息,包括查询是否具有joins,分组,分析函数或任何排序/排序操作。
- getQueryStartTime - 返回查询的开始时间。
- getOperationName - 返回查询执行的操作类型,例如CREATETABLE,DROPTABLE,ALTERDATABASE等,
- getQueryStr - 以字符串形式返回查询。
要创建我们自己的Hive钩子,我们只需要一个实现ExecuteWithHookContext的类,并使用我们的自定义逻辑覆盖其run方法来捕获数据。
- public class CustomHook implements ExecuteWithHookContext {
- private static final Logger logger = Logger.getLogger(CustomHook.class.getName());
- public void run(HookContext hookContext) throws Exception {
- assert (hookContext.getHookType() == HookType.PRE_EXEC_HOOK);
- SessionState ss = SessionState.get();
- UserGroupInformation ugi = hookContext.getUgi();
- Set<ReadEntity> inputs = hookContext.getInputs();
- QueryPlan plan = hookContext.getQueryPlan();
- this.run(ss, ugi, plan, inputs);
- }
复制代码
需要SessionState和UserGroupInformation来收集有关Hive session 及其users的信息。
-
- public void run(SessionState sess, UserGroupInformation ugi, QueryPlan qpln, Set<ReadEntity> inputs)
- throws Exception {
- if (sess != null) {
- String qid = sess.getQueryId() == null ? qpln.getQueryId() : sess.getQueryId();
- String QueryID = qid;
- String Query = sess.getCmd().trim();
- String QueryType = sess.getCommandType();
- // get all information about query
- if (qpln != null) {
- Long Query_Start_Time = qpln.getQueryStartTime();
- QueryProperties queryProps = qpln.getQueryProperties();
- if (queryProps != null) {
- boolean Has_Join = queryProps.hasJoin();
- boolean Has_Group_By = queryProps.hasGroupBy();
- boolean Has_Sort_By = queryProps.hasSortBy();
- boolean Has_Order_By = queryProps.hasOrderBy();
- boolean Has_Distribute_By = queryProps.hasDistributeBy();
- boolean Has_Cluster_By = queryProps.hasClusterBy();
- boolean Has_Windowing = queryProps.hasWindowing();
- }
- }
- // get user id
- String username = sess.getUserName() == null ? ugi.getUserName() : sess.getUserName();
- // get list of database@table names
- List<String> tables = new ArrayList<String>();
- for (Object o : inputs) {
- tables.add(o.toString());
- }
- // Add logic here to format logging msg
- // logger.info(msg)
- }
- }
复制代码
在分配挂钩之前,应将已编译的jar添加到Hive类路径中。 可以在hive-site.xml属性hive.aux.jars.path定义的位置添加jar。 可以使用属性hive.exec.pre.hooks将预执行挂钩设置为自定义挂钩的类。 使用Hive CLI,我们可以执行以下操作:
- set hive.exec.pre.hooks=com.myApp.CustomHook;
复制代码
一旦设置了pre-execution挂钩,就应该为每个用户的每个查询执行CustomHook的代码。 CustomHook收集的信息可以在通用日志记录存储库中记录为逗号分隔值,稍后可以在任何BI工具或Excel文件中提取,以找出有关数据湖使用模式的各种统计信息。
注意事项
- 虽然挂钩是捕获信息的好方法,但它们可能会增加查询执行的延迟。 钩子中的处理可以保持最小以避免这种开销。
- 通过Hive钩子无法捕获有关通过Spark的HiveContext在Hive表上完成的处理的信息。 Spark提供了自己的钩子机制。
|