ighack 发表于 2020-5-25 09:43:19

flink 报AppendStreamTableSink requires that Table has only insert changes

本帖最后由 ighack 于 2020-5-25 09:45 编辑

tableEnv.connect(new Kafka()
                        .version("universal")
                        .startFromEarliest()
                        .topic(topic)
                        .properties(properties)
                        )
      .withFormat(new Json().failOnMissingField(false))
      .withSchema(new Schema().field("name",DataTypes.STRING())
                              .field("num",DataTypes.INT()))
      .inAppendMode()
      .createTemporaryTable("usert")

    val result = tableEnv.sqlQuery("SELECT name, sum(num) as onum FROM usert group by name")

tableEnv.connect(new Kafka()
      .version("universal")
      .topic(sink_topic)
      .properties(props)
      .sinkPartitionerRoundRobin()
    )
      .withFormat(new Json().failOnMissingField(false))
      .withSchema(new Schema().field("name",DataTypes.STRING())
                               .field("onum",DataTypes.INT()))
      .inAppendMode()
      .createTemporaryTable("userp")


      result.insertInto("userp")
网上有的说不能group by    (我需要group by)
有的说要加时间窗   (不加时间窗行不行)
可是我这个就是一个非常简单的东西。不知道要怎么改啊

阿飞 发表于 2020-5-25 18:08:08

模式改为这个试试:.inUpsertMode()

ighack 发表于 2020-5-26 08:14:36

阿飞 发表于 2020-5-25 18:08
模式改为这个试试:.inUpsertMode()

Unknown value for property 'update-mode'.

美丽天空 发表于 2020-5-26 11:42:49

感谢分享
页: [1]
查看完整版本: flink 报AppendStreamTableSink requires that Table has only insert changes