阿飞 发表于 2020-7-13 10:45:35

Flink 1.11使用sql将流式数据写入文件系统【Demo及说明】

本帖最后由 阿飞 于 2020-7-13 10:47 编辑


Flink提供了一个file system connector,可以使用DDL创建一个table,然后使用sql的方法将数据写入hdfs、local等文件系统,支持的写入格式包括json、csv、avro、parquet、orc。
一个最简单的DDL如下:

CREATE TABLE fs_table (
user_id STRING,
order_amount DOUBLE,
dt STRING,
h string,
m string   
) PARTITIONED BY (dt,h,m) WITH (
   'connector'='filesystem',
   'path'='file:///tmp/abc',
   'format'='orc'
);

下面我们简单的介绍一下相关的概念和如何使用。


滚动策略




[*]在写入列格式(比如parquet、orc)的时候,上述的配置和checkpoint的间隔一起来控制滚动策略,也就是说sink.rolling-policy.file-size、sink.rolling-policy.rollover-interval、checkpoint间隔,这三个选项,只要有一个条件达到了,然后就会触发分区文件的滚动,结束上一个文件的写入,生成新文件。
[*]对于写入行格式的数据,比如json、csv,主要是靠sink.rolling-policy.file-size、sink.rolling-policy.rollover-interval,也就是文件的大小和时间来控制写入数据的滚动策略.



分区提交

在往一个分区写完了数据之后,我们希望做一些工作来通知下游。比如在分区目录写一个SUCCESS文件,或者是对于hive来说,去更新metastore的数据,自动刷新一下分区等等。
分区的提交主要依赖于触发器和提交的策略:


[*]触发器:即什么时候触发分区的提交,
[*]提交策略:也就是分区写完之后我们做什么,目前系统提供了两种内置策略:1.往分区目录写一个空SUCCESS文件;2.更新元数据.



分区提交触发器





[*]process-time. 这种提交方式依赖于系统的时间,一旦遇到数据延迟等情况,会造成分区和分区的数据不一致。
[*]partition-time :这种情况需要从分区字段里抽取出来相应的pattern,具体可参考下一个段落分区的抽取。
[*]sink.partition-commit.delay:一旦这个数值设置不为0,则在process-time情况下,当系统时间大于分区创建时间加上delay延迟,会触发分区提交; 如果是在partition-time 情况下,则需要水印大于分区创建时间加上delay时间,会触发分区提交.



第一个参数process-time、partition-time,我们不用做过多的解释,就类似于flink中的processtime和eventtime。

第二个参数sink.partition-commit.delay我们用实际案例解释下:
比如我们配置的是分区是/yyyy-MM-dd/HH/,写入的是ORC列格式,checkpoint配置的间隔是一分钟,也就是默认情况下会每分钟生成一个orc文件,最终会在每个分区(/yyyy-MM-dd/HH/)下面生成60个orc文件。

比如当前系统正在写入/day=2020-07-06/h=10/分区的数据,那么这个分区的创建时间是2020-07-06 10:00:00,如果这个delay配置采用的是默认值,也就是0s,这个时候当写完了一个ORC文件,也就是2020-07-06 10:01:00分钟的时候,就会触发分区提交,比如更新hive的元数据,这个时候我们去查询hive就能查到刚刚写入的文件;如果我们想/day=2020-07-06/h=10/这个分区的60个文件都写完了再更新分区,那么我们可以将这个delay设置成 1h,也就是等到2020-07-06 11:00:00的时候才会触发分区提交,我们才会看到/2020-07-06/10/分区下面的所有数据


分区时间的抽取
从分区值里抽取分区时间,我们可以理解为上面触发器参数配置为partition-time的时候,分区的创建时间,当水印大于这个时间+delay的时候触发分区的提交.




自定义抽取分区时间的话,需要实现PartitionTimeExtractor接口:
public interface PartitionTimeExtractor extends Serializable {

      String DEFAULT = "default";
      String CUSTOM = "custom";

      /**
         * Extract time from partition keys and values.
         */
      LocalDateTime extract(List<String> partitionKeys, List<String> partitionValues);
    ...................
}



分区提交策略
定义了分区提交的策略,也就是写完分区数据之后做什么事情,目前系统提供了以下行为:

metastore,只支持hive table,也就是写完数据之后,更新hive的元数据.
success file: 写完数据,往分区文件写一个success file.

自定义



完整示例
定义实体类

      public static class UserInfo implements java.io.Serializable{
                private String userId;
                private Double amount;
                private Timestamp ts;

                public String getUserId(){
                        return userId;
                }

                public void setUserId(String userId){
                        this.userId = userId;
                }

                public Double getAmount(){
                        return amount;
                }

                public void setAmount(Double amount){
                        this.amount = amount;
                }

                public Timestamp getTs(){
                        return ts;
                }

                public void setTs(Timestamp ts){
                        this.ts = ts;
                }
      }


自定义source

      public static class MySource implements SourceFunction<UserInfo>{

                String userids[] = {
                              "4760858d-2bec-483c-a535-291de04b2247", "67088699-d4f4-43f2-913c-481bff8a2dc5",
                              "72f7b6a8-e1a9-49b4-9a0b-770c41e01bfb", "dfa27cb6-bd94-4bc0-a90b-f7beeb9faa8b",
                              "aabbaa50-72f4-495c-b3a1-70383ee9d6a4", "3218bbb9-5874-4d37-a82d-3e35e52d1702",
                              "3ebfb9602ac07779||3ebfe9612a007979", "aec20d52-c2eb-4436-b121-c29ad4097f6c",
                              "e7e896cd939685d7||e7e8e6c1930689d7", "a4b1e1db-55ef-4d9d-b9d2-18393c5f59ee"
                };

                @Override
                public void run(SourceContext<UserInfo> sourceContext) throws Exception{
                        while (true){
                              String userid = userids[(int) (Math.random() * (userids.length - 1))];
                              UserInfo userInfo = new UserInfo();
                              userInfo.setUserId(userid);
                              userInfo.setAmount(Math.random() * 100);
                              userInfo.setTs(new Timestamp(new Date().getTime()));
                              sourceContext.collect(userInfo);
                              Thread.sleep(100);
                        }
                }

                @Override
                public void cancel(){

                }
      }


写入file
通过sql的ddl创建一个最简单的基于process time的table,然后写入数据.

在这个实例中,我们开启了checkpoint的时间间隔是10s,所以会每隔10s写入一个orc文件.

                StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
                bsEnv.enableCheckpointing(10000);
                StreamTableEnvironment tEnv = StreamTableEnvironment.create(bsEnv);
                DataStream<UserInfo> dataStream = bsEnv.addSource(new MySource());
                String sql = "CREATE TABLE fs_table (\n" +
                           "user_id STRING,\n" +
                           "order_amount DOUBLE,\n" +
                           "dt STRING," +
                           "h string," +
                           "m string\n" +
                           ") PARTITIONED BY (dt,h,m) WITH (\n" +
                           "'connector'='filesystem',\n" +
                           "'path'='file:///tmp/abc',\n" +
                           "'format'='orc'\n" +
                           ")";
                tEnv.executeSql(sql);
                tEnv.createTemporaryView("users", dataStream);
                String insertSql = "insert intofs_table SELECT userId, amount, " +
                                 " DATE_FORMAT(ts, 'yyyy-MM-dd'), DATE_FORMAT(ts, 'HH'), DATE_FORMAT(ts, 'mm') FROM users";

                tEnv.executeSql(insertSql);


完整代码
package connectors.sql;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import java.sql.Timestamp;
import java.util.Date;

/**
* @author zhangjun 欢迎关注我的公众号[大数据技术与应用实战],获取更多精彩实战内容
* <p>
* 流式数据以sql的形式写入file
*/
public class StreamingWriteFile{
      public static void main(String[] args) throws Exception{
                StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
                bsEnv.enableCheckpointing(10000);
                StreamTableEnvironment tEnv = StreamTableEnvironment.create(bsEnv);
                DataStream<UserInfo> dataStream = bsEnv.addSource(new MySource());
                String sql = "CREATE TABLE fs_table (\n" +
                           "user_id STRING,\n" +
                           "order_amount DOUBLE,\n" +
                           "dt STRING," +
                           "h string," +
                           "m string\n" +
                           ") PARTITIONED BY (dt,h,m) WITH (\n" +
                           "'connector'='filesystem',\n" +
                           "'path'='file:///tmp/abc',\n" +
                           "'format'='orc'\n" +
                           ")";
                tEnv.executeSql(sql);
                tEnv.createTemporaryView("users", dataStream);
                String insertSql = "insert intofs_table SELECT userId, amount, " +
                                 " DATE_FORMAT(ts, 'yyyy-MM-dd'), DATE_FORMAT(ts, 'HH'), DATE_FORMAT(ts, 'mm') FROM users";

                tEnv.executeSql(insertSql);

      }

      public static class MySource implements SourceFunction<UserInfo>{

                String userids[] = {
                              "4760858d-2bec-483c-a535-291de04b2247", "67088699-d4f4-43f2-913c-481bff8a2dc5",
                              "72f7b6a8-e1a9-49b4-9a0b-770c41e01bfb", "dfa27cb6-bd94-4bc0-a90b-f7beeb9faa8b",
                              "aabbaa50-72f4-495c-b3a1-70383ee9d6a4", "3218bbb9-5874-4d37-a82d-3e35e52d1702",
                              "3ebfb9602ac07779||3ebfe9612a007979", "aec20d52-c2eb-4436-b121-c29ad4097f6c",
                              "e7e896cd939685d7||e7e8e6c1930689d7", "a4b1e1db-55ef-4d9d-b9d2-18393c5f59ee"
                };

                @Override
                public void run(SourceContext<UserInfo> sourceContext) throws Exception{
                        while (true){
                              String userid = userids[(int) (Math.random() * (userids.length - 1))];
                              UserInfo userInfo = new UserInfo();
                              userInfo.setUserId(userid);
                              userInfo.setAmount(Math.random() * 100);
                              userInfo.setTs(new Timestamp(new Date().getTime()));
                              sourceContext.collect(userInfo);
                              Thread.sleep(100);
                        }
                }

                @Override
                public void cancel(){

                }
      }

      public static class UserInfo implements java.io.Serializable{
                private String userId;
                private Double amount;
                private Timestamp ts;

                public String getUserId(){
                        return userId;
                }

                public void setUserId(String userId){
                        this.userId = userId;
                }

                public Double getAmount(){
                        return amount;
                }

                public void setAmount(Double amount){
                        this.amount = amount;
                }

                public Timestamp getTs(){
                        return ts;
                }

                public void setTs(Timestamp ts){
                        this.ts = ts;
                }
      }
}

代码下载






链接:
https://blog.csdn.net/zhangjun5965/article/details/107162547
页: [1]
查看完整版本: Flink 1.11使用sql将流式数据写入文件系统【Demo及说明】