分享

Spark streaming 拉取mysql数据库表问题

最近在验证spark streaming 读取mysql库表的测试。下面是我的代码@Slf4j
public class StreamingDemo {

    private static JavaStreamingContext ssc = null;

    public static void main(String[] args) {
        SparkConf sparkConf = new SparkConf();
        sparkConf.setMaster("local[4]");
        sparkConf.setAppName("spark streaming demo");

        ssc = new JavaStreamingContext(sparkConf, Durations.seconds(60));

        JavaDStream<VehonwayRt> dStream = ssc.queueStream(getQueue());

        JavaDStream<VehonwayRt> streams = dStream.window(Durations.seconds(60), Durations.seconds(60));

        streams.count();
        streams.foreachRDD(new VoidFunction<JavaRDD<VehonwayRt>>() {
            @Override
            public void call(JavaRDD<VehonwayRt> vehonwayRtJavaRDD) throws Exception {
                vehonwayRtJavaRDD.foreachPartition(new VoidFunction<Iterator<VehonwayRt>>() {
                    @Override
                    public void call(Iterator<VehonwayRt> vehonwayRtIterator) throws Exception {
                        ITopicAnalyseDao topicAnalyseDao = new TopicAnalyseDaoImpl();
                        List<VehonwayRt> list = IteratorUtils.toList(vehonwayRtIterator);
                        topicAnalyseDao.insertVehonwayRt(list);
                    }
                });
            }
        });


        ssc.start();

        try {
            ssc.awaitTermination();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

    }


    public static Queue<JavaRDD<VehonwayRt>> getQueue() {
        Queue<JavaRDD<VehonwayRt>> queue = new ArrayDeque<>();
        List<Object> list = JDBCHelper.getInstance().executeQueryData(SqlsCollection.onway_sql2);

        JSONArray jsonArray = JSONArray.parseArray(JSON.toJSONString(list));
        List<VehonwayRt> vehonwayRts = jsonArray.toJavaList(VehonwayRt.class);

        JavaRDD<VehonwayRt> rdd = ssc.sparkContext().parallelize(vehonwayRts);
        queue.add(rdd);
        System.out.println(" size    ==== " + vehonwayRts.size() + "     " + queue.size());
        return queue;
    }
}
我是定义了getQueue方法,这个方法主要就是从mysql表中拉取数据。然后定义了ssc,间隔时间是60s,ssc执行getQueue生成队列数据流。但是我观察了日志,getQueue在开始执行了一次,后面就没有执行了。我的间隔时间是60s,理论上程序应该每60s就执行一下getQueue 拉取数据吧。

已有(1)人评论

跳转到指定楼层
s060403072 发表于 2019-6-3 16:00:28
看下队列是否一直产生数据,产生多少数据。最好把队列先简化为一个不断产生数据的数据结构。然后在尝试。
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条