flink 1.11 使用sql将流式数据写入hive【Demo及说明】
修改hive配置
上一篇介绍了使用sql将流式数据写入文件系统,这次我们来介绍下使用sql将文件写入hive,对于如果想写入已经存在的hive表,则至少需要添加以下两个属性. 写入hive底层还是和写入文件系统一样的,所以对于其他具体的配置参考上一篇 .Flink 1.11使用sql将流式数据写入文件系统
alter table table_name set TBLPROPERTIES ('is_generic'='false');
alter table table_name set TBLPROPERTIES ('sink.partition-commit.policy.kind'='metastore');
//如果想使用eventtime分区
alter table table_name set TBLPROPERTIES ('sink.partition-commit.trigger'='partition-time');
案例讲解
下面我们讲解一下,如何使用java程序来构建一个flink程序来写入hive。
引入相关的pom
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-hive_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>3.1.2</version>
</dependency>
构造hive catalog
//构造hive catalog
String name = "myhive";
String defaultDatabase = "default";
String hiveConfDir = "/Users/user/work/hive/conf"; // a local path
String version = "3.1.2";
HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir, version);
tEnv.registerCatalog("myhive", hive);
tEnv.useCatalog("myhive");
tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
tEnv.useDatabase("db1");
创建hive表
如果目前系统中没有存在相应的hive表,可以通过在程序中执行相应的DDL建表语句来建表,如果已经存在了,就把这段代码省略,使用上面的hive命令修改现有表,添加相应的属性。
CREATE EXTERNAL TABLE `fs_table`(
`user_id` string,
`order_amount` double)
PARTITIONED BY (
`dt` string,
`h` string,
`m` string)
stored as ORC
TBLPROPERTIES (
'sink.partition-commit.policy.kind'='metastore',
'partition.time-extractor.timestamp-pattern'='$dt $h:$m:00'
)
将流数据插入hive,
String insertSql = "insert intofs_table SELECT userId, amount, " +
" DATE_FORMAT(ts, 'yyyy-MM-dd'), DATE_FORMAT(ts, 'HH'), DATE_FORMAT(ts, 'mm') FROM users";
tEnv.executeSql(insertSql);
完整代码下载:
package connectors.sql;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import javax.annotation.Nullable;
import java.sql.Timestamp;
/**
* @author zhangjun 欢迎关注我的公众号[大数据技术与应用实战],获取更多精彩实战内容
* <p>
* 流式数据以sql的形式写入hive
*/
public class StreamingWriteHive{
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
bsEnv.enableCheckpointing(10000);
bsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
StreamTableEnvironment tEnv = StreamTableEnvironment.create(bsEnv);
DataStream<UserInfo> dataStream = bsEnv.addSource(new MySource())
.assignTimestampsAndWatermarks(
new AssignerWithPunctuatedWatermarks<UserInfo>(){
long water = 0l;
@Nullable
@Override
public Watermark checkAndGetNextWatermark(
UserInfo lastElement,
long extractedTimestamp){
return new Watermark(water);
}
@Override
public long extractTimestamp(
UserInfo element,
long recordTimestamp){
water = element.getTs().getTime();
return water;
}
});
//构造hive catalog
String name = "myhive";
String defaultDatabase = "default";
String hiveConfDir = "/Users/user/work/hive/conf"; // a local path
String version = "3.1.2";
HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir, version);
tEnv.registerCatalog("myhive", hive);
tEnv.useCatalog("myhive");
tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
tEnv.useDatabase("db1");
tEnv.createTemporaryView("users", dataStream);
// 如果hive中已经存在了相应的表,则这段代码省略
// String hiveSql = "CREATE external TABLE fs_table (\n" +
// "user_id STRING,\n" +
// "order_amount DOUBLE" +
// ") partitioned by (dt string,h string,m string) " +
// "stored as ORC " +
// "TBLPROPERTIES (\n" +
// "'partition.time-extractor.timestamp-pattern'='$dt $h:$m:00',\n" +
// "'sink.partition-commit.delay'='0s',\n" +
// "'sink.partition-commit.trigger'='partition-time',\n" +
// "'sink.partition-commit.policy.kind'='metastore'" +
// ")";
// tEnv.executeSql(hiveSql);
String insertSql = "insert intofs_table SELECT userId, amount, " +
" DATE_FORMAT(ts, 'yyyy-MM-dd'), DATE_FORMAT(ts, 'HH'), DATE_FORMAT(ts, 'mm') FROM users";
tEnv.executeSql(insertSql);
}
public static class MySource implements SourceFunction<UserInfo>{
String userids[] = {
"4760858d-2bec-483c-a535-291de04b2247", "67088699-d4f4-43f2-913c-481bff8a2dc5",
"72f7b6a8-e1a9-49b4-9a0b-770c41e01bfb", "dfa27cb6-bd94-4bc0-a90b-f7beeb9faa8b",
"aabbaa50-72f4-495c-b3a1-70383ee9d6a4", "3218bbb9-5874-4d37-a82d-3e35e52d1702",
"3ebfb9602ac07779||3ebfe9612a007979", "aec20d52-c2eb-4436-b121-c29ad4097f6c",
"e7e896cd939685d7||e7e8e6c1930689d7", "a4b1e1db-55ef-4d9d-b9d2-18393c5f59ee"
};
@Override
public void run(SourceContext<UserInfo> sourceContext) throws Exception{
while (true){
String userid = userids[(int) (Math.random() * (userids.length - 1))];
UserInfo userInfo = new UserInfo();
userInfo.setUserId(userid);
userInfo.setAmount(Math.random() * 100);
userInfo.setTs(new Timestamp(System.currentTimeMillis()));
sourceContext.collect(userInfo);
Thread.sleep(100);
}
}
@Override
public void cancel(){
}
}
public static class UserInfo implements java.io.Serializable{
private String userId;
private Double amount;
private Timestamp ts;
public String getUserId(){
return userId;
}
public void setUserId(String userId){
this.userId = userId;
}
public Double getAmount(){
return amount;
}
public void setAmount(Double amount){
this.amount = amount;
}
public Timestamp getTs(){
return ts;
}
public void setTs(Timestamp ts){
this.ts = ts;
}
}
}
下载:
————————————————
遇到的坑
问题详解
对于如上的程序和sql,如果配置了是使用eventtime,在此程序中配置了’sink.partition-commit.trigger’=‘partition-time’,最后发现程序没法提交分区。
分析了一下源码,问题是出在了这个方法,org.apache.flink.table.filesystem.stream.PartitionTimeCommitTigger#committablePartitions。先贴上代码:
@Override
public List<String> committablePartitions(long checkpointId) {
if (!watermarks.containsKey(checkpointId)) {
throw new IllegalArgumentException(String.format(
"Checkpoint(%d) has not been snapshot. The watermark information is: %s.",
checkpointId, watermarks));
}
long watermark = watermarks.get(checkpointId);
watermarks.headMap(checkpointId, true).clear();
List<String> needCommit = new ArrayList<>();
Iterator<String> iter = pendingPartitions.iterator();
while (iter.hasNext()) {
String partition = iter.next();
//通过分区的值抽取分区的时间.
LocalDateTime partTime = extractor.extract(
partitionKeys, extractPartitionValues(new Path(partition)));
//判断水印是否大于分区创建时间+延迟时间
if (watermark > toMills(partTime) + commitDelay) {
needCommit.add(partition);
iter.remove();
}
}
return needCommit;
}
系统通过分区值来抽取相应的分区创建时间,然后进行比对,比如我们设置的pattern是 $dt h:h:h:m:00 , 某一时刻我们正在往 /2020-07-06/18/20/ 这个分区下写数据,那么程序根据分区值,得到的pattern将会是2020-07-06 18:20:00,这个值在sql中是根据DATA_FORMAT函数获取的。
这个值是带有时区的, 也是我想要的, 比如我们的时区设置为东八区,2020-07-06 18:20:00这个时间是东八区的时间,换成标准UTC时间是减去八个小时,也就是2020-07-06 10:20:00,而源码中的toMills函数在处理这个东八区的时间时,并没有任何加入任何时区的处理,把这个其实应该是东八区的时间当做了UTC时间来处理,这样计算出来的值就比实际值大8小时,导致一直没有触发分区的提交。
如果我们在数据源出构造的分区是UTC时间,也就是不带分区的时间,那么这个逻辑就是没有问题的,但是这样又不符合我们的实际情况,比如对于分区2020-07-06 18:20:00,我希望我的分区肯定是东八区的时间,而不是比东八区小8个小时的UTC时间2020-07-06 10:20:00。
所以针对上述情况,有两种解决方案,一种是自定义一个分区抽取类,第二,就是修改源码,改一下现在的缺省的时间分区抽取类。我个人认为修改一下缺省类更好理解,因为目前写入文件和hive这块配置和概念有点多,我不想太增加过多的配置来增加用户的难度,应该尽可能的用缺省值就能使程序很好的运行。
我们看下flink中的StreamingFileSink类,构造分区桶的时候默认是使用的DateTimeBucketAssigner,其构造分区路径就是带有时区概念的,默认就用的是本地时区。
public DateTimeBucketAssigner(String formatString) {
this(formatString, ZoneId.systemDefault());
}
修改方案
这个问题,也不知道算不算一个bug,我给官方提交了一个ISSUE,但是官方没有采纳,不过我觉得不符合我的习惯,所以我对这个功能进行了修改,让partition.time-extractor.timestamp-pattern提取的partiiton是带有时区的,默认情况下是本地时区。如果是非本地时区,可以指定时区,通过参数partition.time-extractor.time-zone来指定,我们可以通下面的代码获取有效的时区。
Set<String> zoneIds = ZoneId.getAvailableZoneIds();
zoneIds.stream().forEach(System.out::println);
比如我们东八区默认使用 Asia/Shanghai。
我基于社区的flink的tag release-1.11.0-rc4,我改了一下代码
将代码放到了github上。
https://github.com/zhangjun0x01/flink/tree/release-1.11.0-rc4
原文链接:https://blog.csdn.net/zhangjun5965/article/details/107201315/
页:
[1]