试试下面代码
[mw_shl_code=java,true]package com.aboutyun.flink;
import java.sql.Timestamp;
import java.util.Date;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.io.jdbc.JDBCAppendTableSink;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.core.fs.FileSystem.WriteMode;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.TimestampAssigner;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.WindowedTable;
import org.apache.flink.table.api.java.StreamTableEnvironment;
public class stream_sql {
public static void main(String[] args) throws Exception {
Properties pro = new Properties();
pro.put("bootstrap.servers", JDBCTestBase.kafka_hosts);
pro.put("zookeeper.connect", JDBCTestBase.kafka_zookper);
pro.put("group.id", JDBCTestBase.kafka_group);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
// env.getConfig().disableSysoutLogging(); //设置此可以屏蔽掉日记打印情况
env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000));
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.enableCheckpointing(5000);
DataStream<String> sourceStream = env
.addSource(new FlinkKafkaConsumer08<String>(JDBCTestBase.kafka_topic, new SimpleStringSchema(), pro));
DataStream<Tuple3<Long, String, Long>> sourceStreamTra = sourceStream.filter(new FilterFunction<String>() {
@Override
public boolean filter(String value) throws Exception {
return StringUtils.isNotBlank(value);
}
}).map(new MapFunction<String, Tuple3<Long, String, Long>>() {
@Override
public Tuple3<Long, String, Long> map(String value) throws Exception {
String temp = value.replaceAll("(\\(|\\))", "");
String[] args = temp.split(",");
try {
return new Tuple3<Long, String, Long>(Long.valueOf(args[2]), args[0].trim(), Long.valueOf(args[1]));
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
return new Tuple3<Long, String, Long>(System.currentTimeMillis(), args[0].trim(),0L);
}
}
});
//設置将哪个字段用于eventTime
DataStream<Tuple3<Long, String, Long>> withTimestampsAndWatermarks = sourceStreamTra
.assignTimestampsAndWatermarks(new FirstTandW());
//内置参数rowtime.rowtime就是EventTime protime是ProcessingTime
tableEnv.registerDataStream("wiki_table", withTimestampsAndWatermarks, "etime,name, num,rowtime.rowtime");
withTimestampsAndWatermarks.print();
// define sink for room data and execute query
JDBCAppendTableSink sink = JDBCAppendTableSink.builder().setDrivername(JDBCTestBase.DRIVER_CLASS)
.setDBUrl(JDBCTestBase.DB_URL).setQuery("INSERT INTO wiki (avg,time) VALUES (?,?)")
.setParameterTypes(Types.LONG, Types.SQL_TIMESTAMP).build();
//执行查询
Table result = tableEnv.sqlQuery(JDBCTestBase.SQL_MIN);
//写入csv
// result.writeToSink(new CsvTableSink("D:/a.csv", // output path
// "|", // optional: delimit files by '|'
// 1, // optional: write to a single file
// WriteMode.OVERWRITE)); // optional: override existing files
//写入数据库
result.writeToSink(sink);
env.execute();
}
}[/mw_shl_code]
|