本帖最后由 ighack 于 2020-5-25 09:45 编辑
[mw_shl_code=scala,true]tableEnv.connect(new Kafka()
.version("universal")
.startFromEarliest()
.topic(topic)
.properties(properties)
)
.withFormat(new Json().failOnMissingField(false))
.withSchema(new Schema().field("name",DataTypes.STRING())
.field("num",DataTypes.INT()))
.inAppendMode()
.createTemporaryTable("usert")
val result = tableEnv.sqlQuery("SELECT name, sum(num) as onum FROM usert group by name")
tableEnv.connect(new Kafka()
.version("universal")
.topic(sink_topic)
.properties(props)
.sinkPartitionerRoundRobin()
)
.withFormat(new Json().failOnMissingField(false))
.withSchema(new Schema().field("name",DataTypes.STRING())
.field("onum",DataTypes.INT()))
.inAppendMode()
.createTemporaryTable("userp")
result.insertInto("userp")[/mw_shl_code]
网上有的说不能group by (我需要group by)
有的说要加时间窗 (不加时间窗行不行)
可是我这个就是一个非常简单的东西。不知道要怎么改啊
|
|