flink 报AppendStreamTableSink requires that Table has only insert changes
本帖最后由 ighack 于 2020-5-25 09:45 编辑tableEnv.connect(new Kafka()
.version("universal")
.startFromEarliest()
.topic(topic)
.properties(properties)
)
.withFormat(new Json().failOnMissingField(false))
.withSchema(new Schema().field("name",DataTypes.STRING())
.field("num",DataTypes.INT()))
.inAppendMode()
.createTemporaryTable("usert")
val result = tableEnv.sqlQuery("SELECT name, sum(num) as onum FROM usert group by name")
tableEnv.connect(new Kafka()
.version("universal")
.topic(sink_topic)
.properties(props)
.sinkPartitionerRoundRobin()
)
.withFormat(new Json().failOnMissingField(false))
.withSchema(new Schema().field("name",DataTypes.STRING())
.field("onum",DataTypes.INT()))
.inAppendMode()
.createTemporaryTable("userp")
result.insertInto("userp")
网上有的说不能group by (我需要group by)
有的说要加时间窗 (不加时间窗行不行)
可是我这个就是一个非常简单的东西。不知道要怎么改啊
模式改为这个试试:.inUpsertMode()
阿飞 发表于 2020-5-25 18:08
模式改为这个试试:.inUpsertMode()
Unknown value for property 'update-mode'.
感谢分享
页:
[1]