分享

Flink集成Hive之Hive Catalog与Hive Dialect应用

本帖最后由 levycui 于 2021-3-24 19:07 编辑
问题导读:
1、什么是Hive Catalog?
2、如何使用Hive Catalog?
3、什么是Hive Dialect?
4、如何使用Hive Dialect?

上一篇:Flink1.12 集成hive


在上一篇分享Flink集成Hive之快速入门--以Flink1.12为例中,介绍了Flink集成Hive的进本步骤。本文分享,将继续介绍Flink集成Hive的另外两个概念:Hive Catalog与Hive Dialect。本文包括以下内容,希望对你有所帮助。

  •     什么是Hive Catalog
  •     如何使用Hive Catalog
  •     什么是Hive Dialect
  •     如何使用Hive Dialect

什么是Hive Catalog

我们知道,Hive使用Hive Metastore(HMS)存储元数据信息,使用关系型数据库来持久化存储这些信息。所以,Flink集成Hive需要打通Hive的metastore,去管理Flink的元数据,这就是Hive Catalog的功能。

Hive Catalog的主要作用是使用Hive MetaStore去管理Flink的元数据。Hive Catalog可以将元数据进行持久化,这样后续的操作就可以反复使用这些表的元数据,而不用每次使用时都要重新注册。如果不去持久化catalog,那么在每个session中取处理数据,都要去重复地创建元数据对象,这样是非常耗时的。

如何使用Hive Catalog

HiveCatalog是开箱即用的,所以,一旦配置好Flink与Hive集成,就可以使用HiveCatalog。比如,我们通过FlinkSQL 的DDL语句创建一张kafka的数据源表,立刻就能查看该表的元数据信息。

HiveCatalog可以处理两种类型的表:一种是Hive兼容的表,另一种是普通表(generic table)。其中Hive兼容表是以兼容Hive的方式来存储的,所以,对于Hive兼容表而言,我们既可以使用Flink去操作该表,又可以使用Hive去操作该表。

普通表是对Flink而言的,当使用HiveCatalog创建一张普通表,仅仅是使用Hive MetaStore将其元数据进行了持久化,所以可以通过Hive查看这些表的元数据信息(通过DESCRIBE FORMATTED命令),但是不能通过Hive去处理这些表,因为语法不兼容。

对于是否是普通表,Flink使用is_generic属性进行标识。默认情况下,创建的表是普通表,即is_generic=true,如果要创建Hive兼容表,需要在建表属性中指定is_generic=false。

    尖叫提示:

    由于依赖Hive Metastore,所以必须开启Hive MetaStore服务

代码中使用Hive Catalog
  1. <div>   EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().build();
  2.         TableEnvironment tableEnv = TableEnvironment.create(settings);
  3.         String name            = "myhive";
  4.         String defaultDatabase = "default";
  5.         String hiveConfDir = "/opt/modules/apache-hive-2.3.4-bin/conf";
  6.         HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir);
  7.         tableEnv.registerCatalog("myhive", hive);
  8.         // 使用注册的catalog
  9.         tableEnv.useCatalog("myhive");</div>
复制代码


Flink SQLCli中使用Hive Catalog

在FlinkSQL Cli中使用Hive Catalog很简单,只需要配置一下sql-cli-defaults.yaml文件即可。配置内容如下:
  1. catalogs:
  2.    - name: myhive
  3.      type: hive
  4.      default-database: default
  5.      hive-conf-dir: /opt/modules/apache-hive-2.3.4-bin/conf
复制代码

2021-03-24_190433.jpg

在FlinkSQL Cli中创建一张kafka表,该表默认为普通表,即is_generic=true
  1. CREATE TABLE user_behavior (
  2.     `user_id` BIGINT, -- 用户id
  3.     `item_id` BIGINT, -- 商品id
  4.     `cat_id` BIGINT, -- 品类id
  5.     `action` STRING, -- 用户行为
  6.     `province` INT, -- 用户所在的省份
  7.     `ts` BIGINT, -- 用户行为发生的时间戳
  8.     `proctime` AS PROCTIME(), -- 通过计算列产生一个处理时间列
  9.     `eventTime` AS TO_TIMESTAMP(FROM_UNIXTIME(ts, 'yyyy-MM-dd HH:mm:ss')), -- 事件时间
  10.      WATERMARK FOR eventTime AS eventTime - INTERVAL '5' SECOND  -- 定义watermark
  11. ) WITH (
  12.     'connector' = 'kafka', -- 使用 kafka connector
  13.     'topic' = 'user_behavior', -- kafka主题
  14.     'scan.startup.mode' = 'earliest-offset', -- 偏移量
  15.     'properties.group.id' = 'group1', -- 消费者组
  16.     'properties.bootstrap.servers' = 'kms-2:9092,kms-3:9092,kms-4:9092',
  17.     'format' = 'json', -- 数据源格式为json
  18.     'json.fail-on-missing-field' = 'true',
  19.     'json.ignore-parse-errors' = 'false'
  20. );
复制代码

我们可以在Hive客户端中查看该表的元数据信息
  1. hive (default)> desc formatted  user_behavior;
  2. Table Parameters:               
  3.        ...
  4.         is_generic              true               
  5.       ...         
复制代码

从上面的元数据信息可以看出,is_generic=true,说明该表是一张普通表,如果在Hive中去查看该表,则会报错。

上面创建的表是普通表,该表不能使用Hive去查询。那么,该如何创建一张Hive兼容表呢?我们只需要在建表的属性中显示指定is_generic=false即可,具体如下:
  1. CREATE TABLE hive_compatible_tbl (
  2.     `user_id` BIGINT, -- 用户id
  3.     `item_id` BIGINT, -- 商品id
  4.     `cat_id` BIGINT, -- 品类id
  5.     `action` STRING, -- 用户行为
  6.     `province` INT, -- 用户所在的省份
  7.     `ts` BIGINT -- 用户行为发生的时间戳
  8. ) WITH (
  9.     'connector' = 'kafka', -- 使用 kafka connector
  10.     'topic' = 'user_behavior', -- kafka主题
  11.     'scan.startup.mode' = 'earliest-offset', -- 偏移量
  12.     'properties.group.id' = 'group1', -- 消费者组
  13.     'properties.bootstrap.servers' = 'kms-2:9092,kms-3:9092,kms-4:9092',
  14.     'format' = 'json', -- 数据源格式为json
  15.     'json.fail-on-missing-field' = 'true',
  16.     'json.ignore-parse-errors' = 'false',
  17.     'is_generic' = 'false'
  18. );
复制代码

当我们在Hive中查看该表的元数据信息时,可以看出:is_generic =false
  1. hive (default)> desc formatted hive_compatible_tbl;
  2. Table Parameters:               
  3.         ...           
  4.         is_generic              false               
  5.         ...
复制代码

我们可以使用FlinkSQL Cli或者HiveCli向该表中写入数据,然后分别通过FlinkSQL Cli和Hive Cli去查看该表数据的变化
  1. hive (default)> insert into hive_compatible_tbl select 2020,1221,100,'buy',11,1574330486;
  2. hive (default)> select * from hive_compatible_tbl;
复制代码

再在FlinkSQL Cli中查看该表,
  1. Flink SQL> select user_id,item_id,action from hive_compatible_tbl;
  2.                    user_id                   item_id                    action
  3.                       2020                      1221                       buy
复制代码

同样,我们可以在FlinkSQL Cli中去向该表中写入数据:
  1. Flink SQL>  insert into hive_compatible_tbl select 2020,1222,101,'fav',11,1574330486;
  2. Flink SQL> select user_id,item_id,action from hive_compatible_tbl;
复制代码
  1.                    user_id                   item_id                    action
  2.                       2020                      1221                       buy
  3.                       2020                      1222                       fav
复制代码

    尖叫提示:

    对于Hive兼容的表,需要注意数据类型,具体的数据类型对应关系以及注意点如下


2021-03-24_190531.jpg
2021-03-24_190545.jpg

注意:

  •     Hive CHAR(p) 类型的最大长度为255
  •     Hive VARCHAR(p)类型的最大长度为65535
  •     Hive MAP类型的key仅支持基本类型,而Flink’s MAP 类型的key执行任意类型
  •     Hive不支持联合数据类型,比如STRUCT
  •     Hive’s TIMESTAMP 的精度是 9 , Hive UDFs函数只能处理 precision <= 9的 TIMESTAMP 值
  •     Hive 不支持 Flink提供的 TIMESTAMP_WITH_TIME_ZONE, TIMESTAMP_WITH_LOCAL_TIME_ZONE, 及MULTISET类型
  •     FlinkINTERVAL 类型与 Hive INTERVAL 类型不一样

上面介绍了普通表和Hive兼容表,那么我们该如何使用Hive的语法进行建表呢?这个时候就需要使用Hive Dialect。

什么是Hive Dialect

从Flink1.11.0开始,只要开启了Hive dialect配置,用户就可以使用HiveQL语法,这样我们就可以在Flink中使用Hive的语法使用一些DDL和DML操作。

Flink目前支持两种SQL方言(SQL dialects),分别为:default和hive。默认的SQL方言是default,如果要使用Hive的语法,需要将SQL方言切换到hive。

如何使用Hive Dialect

在SQL Cli中使用Hive dialect

使用hive dialect只需要配置一个参数即可,该参数名称为:table.sql-dialect。我们就可以在sql-client-defaults.yaml配置文件中进行配置,也可以在具体的会话窗口中进行设定,对于SQL dialect的切换,不需要进行重启session。
  1. execution:
  2.   planner: blink
  3.   type: batch
  4.   result-mode: table
  5. configuration:
  6.   table.sql-dialect: hive
复制代码

如果我们需要在SQL Cli中进行切换hive dialect,可以使用如下命令:
  1. Flink SQL> set table.sql-dialect=hive; -- 使用hive dialect
  2. Flink SQL> set table.sql-dialect=default; -- 使用default dialect
复制代码

    尖叫提示:

    一旦切换到了hive dialect,就只能使用Hive的语法建表,如果尝试使用Flink的语法建表,则会报错
  1. 在Table API中配合dialect
  2. EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner()...build();
  3. TableEnvironment tableEnv = TableEnvironment.create(settings);
  4. // 使用hive dialect
  5. tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
  6. // 使用 default dialect
  7. tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
复制代码

操作示例
  1. Flink SQL> set table.sql-dialect=hive;
  2. -- 使用Hive语法创建一张表
  3. CREATE TABLE IF NOT EXISTS `hive_dialect_tbl` (
  4.   `id` int COMMENT 'id',
  5.   `name` string COMMENT '名称',
  6.   `age` int COMMENT '年龄'
  7. )
  8. COMMENT 'hive dialect表测试'
  9. ROW FORMAT DELIMITED FIELDS TERMINATED BY ',';
复制代码

进入Hive客户端去查看该表的元数据信息
  1. desc formatted hive_dialect_tbl;
  2. col_name        data_type       comment
  3. # col_name              data_type               comment            
  4.                  
  5. id                      int                                         
  6. name                    string                                      
  7. age                     int                                         
  8.                  
  9. # Detailed Table Information            
  10. Database:               default                  
  11. Owner:                  null                     
  12. CreateTime:             Mon Dec 21 17:23:48 CST 2020     
  13. LastAccessTime:         UNKNOWN                  
  14. Retention:              0                        
  15. Location:               hdfs://kms-1.apache.com:8020/user/hive/warehouse/hive_dialect_tbl        
  16. Table Type:             MANAGED_TABLE            
  17. Table Parameters:               
  18.         comment                 hive dialect表测试     
  19.         is_generic              false               
  20.         transient_lastDdlTime   1608542628         
  21.                  
  22. # Storage Information            
  23. SerDe Library:          org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe      
  24. InputFormat:            org.apache.hadoop.mapred.TextInputFormat         
  25. OutputFormat:           org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat   
  26. Compressed:             No                       
  27. Num Buckets:            -1                       
  28. Bucket Columns:         []                       
  29. Sort Columns:           []                       
  30. Storage Desc Params:            
  31.         field.delim             ,                  
  32.         serialization.format    ,                  
复制代码

很明显,该表是一张Hive兼容表,即is_generic=false。

使用FlinkSQLCli向该表中写入一条数据:
  1. Flink SQL> insert into hive_dialect_tbl select 1,'tom',20;
复制代码

我们也可以在Hive的Cli中去操作该表
  1. hive (default)> select * from hive_dialect_tbl;
  2. hive (default)> insert into hive_dialect_tbl select 2,'jack',22;
复制代码

以下是使用Hive方言的一些注意事项。

  •     Hive dialect只能用于操作Hive表,不能用于普通表。Hive方言应与HiveCatalog一起使用。
  •     虽然所有Hive版本都支持相同的语法,但是是否有特定功能仍然取决于使用的Hive版本。例如,仅在Hive-2.4.0或更高版本中支持更新数据库位置。
  •     Hive和Calcite具有不同的保留关键字。例如,default在Calcite中是保留关键字,在Hive中是非保留关键字。所以,在使用Hive dialect时,必须使用反引号(`)引用此类关键字,才能将其用作标识符。
  •     在Hive中不能查询在Flink中创建的视图。

当然,一旦开启了Hive dialect,我们就可以按照Hive的操作方式在Flink中去处理Hive的数据了,具体的操作与Hive一致,本文不再赘述。

总结

本文主要介绍了Hive Catalog和Hive Dialect。其中Hive Catalog的作用是持久化Flink的元数据信息,Hive Dialect是支持Hive语法的一个配置参数,这两个概念是Flink集成Hive的关键。下一篇分享将介绍如何使用Flink读写Hive。

作者:大数据技术与数仓
来源:https://www.jianshu.com/p/be3b94225f4a

最新经典文章,欢迎关注公众号

没找到任何评论,期待你打破沉寂

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条