xiaobaiyang 发表于 2019-6-3 14:00:21

Spark streaming 拉取mysql数据库表问题

最近在验证spark streaming 读取mysql库表的测试。下面是我的代码@Slf4j
public class StreamingDemo {

    private static JavaStreamingContext ssc = null;

    public static void main(String[] args) {
      SparkConf sparkConf = new SparkConf();
      sparkConf.setMaster("local");
      sparkConf.setAppName("spark streaming demo");

      ssc = new JavaStreamingContext(sparkConf, Durations.seconds(60));

      JavaDStream<VehonwayRt> dStream = ssc.queueStream(getQueue());

      JavaDStream<VehonwayRt> streams = dStream.window(Durations.seconds(60), Durations.seconds(60));

      streams.count();
      streams.foreachRDD(new VoidFunction<JavaRDD<VehonwayRt>>() {
            @Override
            public void call(JavaRDD<VehonwayRt> vehonwayRtJavaRDD) throws Exception {
                vehonwayRtJavaRDD.foreachPartition(new VoidFunction<Iterator<VehonwayRt>>() {
                  @Override
                  public void call(Iterator<VehonwayRt> vehonwayRtIterator) throws Exception {
                        ITopicAnalyseDao topicAnalyseDao = new TopicAnalyseDaoImpl();
                        List<VehonwayRt> list = IteratorUtils.toList(vehonwayRtIterator);
                        topicAnalyseDao.insertVehonwayRt(list);
                  }
                });
            }
      });


      ssc.start();

      try {
            ssc.awaitTermination();
      } catch (InterruptedException e) {
            e.printStackTrace();
      }

    }


    public static Queue<JavaRDD<VehonwayRt>> getQueue() {
      Queue<JavaRDD<VehonwayRt>> queue = new ArrayDeque<>();
      List<Object> list = JDBCHelper.getInstance().executeQueryData(SqlsCollection.onway_sql2);

      JSONArray jsonArray = JSONArray.parseArray(JSON.toJSONString(list));
      List<VehonwayRt> vehonwayRts = jsonArray.toJavaList(VehonwayRt.class);

      JavaRDD<VehonwayRt> rdd = ssc.sparkContext().parallelize(vehonwayRts);
      queue.add(rdd);
      System.out.println(" size    ==== " + vehonwayRts.size() + "   " + queue.size());
      return queue;
    }
}
我是定义了getQueue方法,这个方法主要就是从mysql表中拉取数据。然后定义了ssc,间隔时间是60s,ssc执行getQueue生成队列数据流。但是我观察了日志,getQueue在开始执行了一次,后面就没有执行了。我的间隔时间是60s,理论上程序应该每60s就执行一下getQueue 拉取数据吧。

s060403072 发表于 2019-6-3 16:00:28

看下队列是否一直产生数据,产生多少数据。最好把队列先简化为一个不断产生数据的数据结构。然后在尝试。
页: [1]
查看完整版本: Spark streaming 拉取mysql数据库表问题