Spark streaming 拉取mysql数据库表问题
最近在验证spark streaming 读取mysql库表的测试。下面是我的代码@Slf4jpublic class StreamingDemo {
private static JavaStreamingContext ssc = null;
public static void main(String[] args) {
SparkConf sparkConf = new SparkConf();
sparkConf.setMaster("local");
sparkConf.setAppName("spark streaming demo");
ssc = new JavaStreamingContext(sparkConf, Durations.seconds(60));
JavaDStream<VehonwayRt> dStream = ssc.queueStream(getQueue());
JavaDStream<VehonwayRt> streams = dStream.window(Durations.seconds(60), Durations.seconds(60));
streams.count();
streams.foreachRDD(new VoidFunction<JavaRDD<VehonwayRt>>() {
@Override
public void call(JavaRDD<VehonwayRt> vehonwayRtJavaRDD) throws Exception {
vehonwayRtJavaRDD.foreachPartition(new VoidFunction<Iterator<VehonwayRt>>() {
@Override
public void call(Iterator<VehonwayRt> vehonwayRtIterator) throws Exception {
ITopicAnalyseDao topicAnalyseDao = new TopicAnalyseDaoImpl();
List<VehonwayRt> list = IteratorUtils.toList(vehonwayRtIterator);
topicAnalyseDao.insertVehonwayRt(list);
}
});
}
});
ssc.start();
try {
ssc.awaitTermination();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static Queue<JavaRDD<VehonwayRt>> getQueue() {
Queue<JavaRDD<VehonwayRt>> queue = new ArrayDeque<>();
List<Object> list = JDBCHelper.getInstance().executeQueryData(SqlsCollection.onway_sql2);
JSONArray jsonArray = JSONArray.parseArray(JSON.toJSONString(list));
List<VehonwayRt> vehonwayRts = jsonArray.toJavaList(VehonwayRt.class);
JavaRDD<VehonwayRt> rdd = ssc.sparkContext().parallelize(vehonwayRts);
queue.add(rdd);
System.out.println(" size ==== " + vehonwayRts.size() + " " + queue.size());
return queue;
}
}
我是定义了getQueue方法,这个方法主要就是从mysql表中拉取数据。然后定义了ssc,间隔时间是60s,ssc执行getQueue生成队列数据流。但是我观察了日志,getQueue在开始执行了一次,后面就没有执行了。我的间隔时间是60s,理论上程序应该每60s就执行一下getQueue 拉取数据吧。
看下队列是否一直产生数据,产生多少数据。最好把队列先简化为一个不断产生数据的数据结构。然后在尝试。
页:
[1]