分享

flink 1.11 使用sql将流式数据写入hive【Demo及说明】

阿飞 2020-7-13 11:04:36 发表于 Flink案例 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 0 10784


修改hive配置
上一篇介绍了使用sql将流式数据写入文件系统,这次我们来介绍下使用sql将文件写入hive,对于如果想写入已经存在的hive表,则至少需要添加以下两个属性. 写入hive底层还是和写入文件系统一样的,所以对于其他具体的配置参考上一篇 .Flink 1.11使用sql将流式数据写入文件系统


[mw_shl_code=java,true]alter table table_name set TBLPROPERTIES ('is_generic'='false');

alter table table_name set TBLPROPERTIES ('sink.partition-commit.policy.kind'='metastore');

//如果想使用eventtime分区
alter table table_name set TBLPROPERTIES ('sink.partition-commit.trigger'='partition-time');

[/mw_shl_code]

案例讲解
下面我们讲解一下,如何使用java程序来构建一个flink程序来写入hive。


引入相关的pom

[mw_shl_code=xml,true]      <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-hive_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hive</groupId>
            <artifactId>hive-exec</artifactId>
            <version>3.1.2</version>
        </dependency>
[/mw_shl_code]

构造hive catalog

[mw_shl_code=java,true]                //构造hive catalog
                String name = "myhive";
                String defaultDatabase = "default";
                String hiveConfDir = "/Users/user/work/hive/conf"; // a local path
                String version = "3.1.2";

                HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir, version);
                tEnv.registerCatalog("myhive", hive);
                tEnv.useCatalog("myhive");
                tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
                tEnv.useDatabase("db1");
[/mw_shl_code]

创建hive表
如果目前系统中没有存在相应的hive表,可以通过在程序中执行相应的DDL建表语句来建表,如果已经存在了,就把这段代码省略,使用上面的hive命令修改现有表,添加相应的属性。

[mw_shl_code=sql,true]CREATE EXTERNAL TABLE `fs_table`(
  `user_id` string,
  `order_amount` double)
PARTITIONED BY (
  `dt` string,
  `h` string,
  `m` string)
stored as ORC
TBLPROPERTIES (
  'sink.partition-commit.policy.kind'='metastore',
  'partition.time-extractor.timestamp-pattern'='$dt $h:$m:00'
)
[/mw_shl_code]

将流数据插入hive,

[mw_shl_code=sql,true]        String insertSql = "insert into  fs_table SELECT userId, amount, " +
                                   " DATE_FORMAT(ts, 'yyyy-MM-dd'), DATE_FORMAT(ts, 'HH'), DATE_FORMAT(ts, 'mm') FROM users";
                tEnv.executeSql(insertSql);
[/mw_shl_code]

完整代码下载:
[mw_shl_code=java,true]package connectors.sql;

import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.hive.HiveCatalog;

import javax.annotation.Nullable;

import java.sql.Timestamp;
/**
* @author zhangjun 欢迎关注我的公众号[大数据技术与应用实战],获取更多精彩实战内容
* <p>
* 流式数据以sql的形式写入hive
*/
public class StreamingWriteHive{
        public static void main(String[] args) throws Exception{
                StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
                bsEnv.enableCheckpointing(10000);
                bsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
                StreamTableEnvironment tEnv = StreamTableEnvironment.create(bsEnv);
                DataStream<UserInfo> dataStream = bsEnv.addSource(new MySource())
                                                       .assignTimestampsAndWatermarks(
                                                                       new AssignerWithPunctuatedWatermarks<UserInfo>(){
                                                                               long water = 0l;
                                                                               @Nullable
                                                                               @Override
                                                                               public Watermark checkAndGetNextWatermark(
                                                                                               UserInfo lastElement,
                                                                                               long extractedTimestamp){
                                                                                       return new Watermark(water);
                                                                               }

                                                                               @Override
                                                                               public long extractTimestamp(
                                                                                               UserInfo element,
                                                                                               long recordTimestamp){
                                                                                       water = element.getTs().getTime();
                                                                                       return water;
                                                                               }
                                                                       });


                //构造hive catalog
                String name = "myhive";
                String defaultDatabase = "default";
                String hiveConfDir = "/Users/user/work/hive/conf"; // a local path
                String version = "3.1.2";

                HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir, version);
                tEnv.registerCatalog("myhive", hive);
                tEnv.useCatalog("myhive");
                tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
                tEnv.useDatabase("db1");

                tEnv.createTemporaryView("users", dataStream);

//      如果hive中已经存在了相应的表,则这段代码省略
//                String hiveSql = "CREATE external TABLE fs_table (\n" +
//                                 "  user_id STRING,\n" +
//                                 "  order_amount DOUBLE" +
//                                 ") partitioned by (dt string,h string,m string) " +
//                                 "stored as ORC " +
//                                 "TBLPROPERTIES (\n" +
//                                 "  'partition.time-extractor.timestamp-pattern'='$dt $h:$m:00',\n" +
//                                 "  'sink.partition-commit.delay'='0s',\n" +
//                                 "  'sink.partition-commit.trigger'='partition-time',\n" +
//                                 "  'sink.partition-commit.policy.kind'='metastore'" +
//                                 ")";
//                tEnv.executeSql(hiveSql);

                String insertSql = "insert into  fs_table SELECT userId, amount, " +
                                   " DATE_FORMAT(ts, 'yyyy-MM-dd'), DATE_FORMAT(ts, 'HH'), DATE_FORMAT(ts, 'mm') FROM users";
                tEnv.executeSql(insertSql);
        }


        public static class MySource implements SourceFunction<UserInfo>{

                String userids[] = {
                                "4760858d-2bec-483c-a535-291de04b2247", "67088699-d4f4-43f2-913c-481bff8a2dc5",
                                "72f7b6a8-e1a9-49b4-9a0b-770c41e01bfb", "dfa27cb6-bd94-4bc0-a90b-f7beeb9faa8b",
                                "aabbaa50-72f4-495c-b3a1-70383ee9d6a4", "3218bbb9-5874-4d37-a82d-3e35e52d1702",
                                "3ebfb9602ac07779||3ebfe9612a007979", "aec20d52-c2eb-4436-b121-c29ad4097f6c",
                                "e7e896cd939685d7||e7e8e6c1930689d7", "a4b1e1db-55ef-4d9d-b9d2-18393c5f59ee"
                };

                @Override
                public void run(SourceContext<UserInfo> sourceContext) throws Exception{

                        while (true){
                                String userid = userids[(int) (Math.random() * (userids.length - 1))];
                                UserInfo userInfo = new UserInfo();
                                userInfo.setUserId(userid);
                                userInfo.setAmount(Math.random() * 100);
                                userInfo.setTs(new Timestamp(System.currentTimeMillis()));
                                sourceContext.collect(userInfo);
                                Thread.sleep(100);
                        }
                }

                @Override
                public void cancel(){

                }
        }

        public static class UserInfo implements java.io.Serializable{
                private String userId;
                private Double amount;
                private Timestamp ts;

                public String getUserId(){
                        return userId;
                }

                public void setUserId(String userId){
                        this.userId = userId;
                }

                public Double getAmount(){
                        return amount;
                }

                public void setAmount(Double amount){
                        this.amount = amount;
                }

                public Timestamp getTs(){
                        return ts;
                }

                public void setTs(Timestamp ts){
                        this.ts = ts;
                }
        }
}[/mw_shl_code]

下载:
StreamingWriteHive.java.rar (2.02 KB, 下载次数: 2439)

没找到任何评论,期待你打破沉寂

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条