分享

关于flink的JDBCAppendTableSink的问题

chenyui 2019-6-13 15:41:04 发表于 疑问解答 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 1 6171
为什么一用JDBCAppendTableSink作为sink,MySQL就接不到数据呢。
换成继承RichSinkFunction类,就可以写入MySQL数据。

已有(1)人评论

跳转到指定楼层
yaojiank 发表于 2019-6-15 08:55:22

试试下面代码
[mw_shl_code=java,true]package com.aboutyun.flink;



import java.sql.Timestamp;

import java.util.Date;

import java.util.Properties;

import java.util.concurrent.TimeUnit;



import org.apache.commons.lang3.StringUtils;

import org.apache.flink.api.common.functions.FilterFunction;

import org.apache.flink.api.common.functions.MapFunction;

import org.apache.flink.api.common.restartstrategy.RestartStrategies;

import org.apache.flink.api.common.typeinfo.TypeInformation;

import org.apache.flink.api.common.typeinfo.Types;

import org.apache.flink.api.java.io.jdbc.JDBCAppendTableSink;

import org.apache.flink.api.java.tuple.Tuple3;

import org.apache.flink.api.java.tuple.Tuple4;

import org.apache.flink.core.fs.FileSystem.WriteMode;

import org.apache.flink.streaming.api.TimeCharacteristic;

import org.apache.flink.streaming.api.datastream.DataStream;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.streaming.api.functions.TimestampAssigner;

import org.apache.flink.streaming.api.windowing.time.Time;

import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08;

import org.apache.flink.streaming.util.serialization.SimpleStringSchema;

import org.apache.flink.table.api.Table;

import org.apache.flink.table.api.TableEnvironment;

import org.apache.flink.table.api.WindowedTable;

import org.apache.flink.table.api.java.StreamTableEnvironment;



public class stream_sql {

    public static void main(String[] args) throws Exception {



        Properties pro = new Properties();

        pro.put("bootstrap.servers", JDBCTestBase.kafka_hosts);

        pro.put("zookeeper.connect", JDBCTestBase.kafka_zookper);

        pro.put("group.id", JDBCTestBase.kafka_group);

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

        // env.getConfig().disableSysoutLogging(); //设置此可以屏蔽掉日记打印情况

        env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000));

        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        env.enableCheckpointing(5000);



        DataStream<String> sourceStream = env

                    .addSource(new FlinkKafkaConsumer08<String>(JDBCTestBase.kafka_topic, new SimpleStringSchema(), pro));



        DataStream<Tuple3<Long, String, Long>> sourceStreamTra = sourceStream.filter(new FilterFunction<String>() {

            @Override

            public boolean filter(String value) throws Exception {

                return StringUtils.isNotBlank(value);

            }

        }).map(new MapFunction<String, Tuple3<Long, String, Long>>() {

            @Override

            public Tuple3<Long, String, Long> map(String value) throws Exception {

                String temp = value.replaceAll("(\\(|\\))", "");

                String[] args = temp.split(",");

                try {

                    return new Tuple3<Long, String, Long>(Long.valueOf(args[2]), args[0].trim(), Long.valueOf(args[1]));

                    

                } catch (Exception e) {

                    // TODO Auto-generated catch block

                    e.printStackTrace();

                    return new Tuple3<Long, String, Long>(System.currentTimeMillis(), args[0].trim(),0L);

                    

                }

            }

        });

        //設置将哪个字段用于eventTime

        DataStream<Tuple3<Long, String, Long>> withTimestampsAndWatermarks = sourceStreamTra

                .assignTimestampsAndWatermarks(new FirstTandW());

        //内置参数rowtime.rowtime就是EventTime protime是ProcessingTime

        tableEnv.registerDataStream("wiki_table", withTimestampsAndWatermarks, "etime,name, num,rowtime.rowtime");

        withTimestampsAndWatermarks.print();

        // define sink for room data and execute query

        JDBCAppendTableSink sink = JDBCAppendTableSink.builder().setDrivername(JDBCTestBase.DRIVER_CLASS)

                .setDBUrl(JDBCTestBase.DB_URL).setQuery("INSERT INTO wiki (avg,time) VALUES (?,?)")

                .setParameterTypes(Types.LONG, Types.SQL_TIMESTAMP).build();

        //执行查询

        Table result = tableEnv.sqlQuery(JDBCTestBase.SQL_MIN);

        //写入csv

//        result.writeToSink(new CsvTableSink("D:/a.csv", // output path

//                "|", // optional: delimit files by '|'

//                1, // optional: write to a single file

//                WriteMode.OVERWRITE)); // optional: override existing files

        //写入数据库

        result.writeToSink(sink);

        

        env.execute();



            

  }

}[/mw_shl_code]

回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条