搜索
搜 索
本版
文章
帖子
用户
图文精华
hadoop-2.6.0+zookeeper-3.4.6+hbase-1.0.0+hive-1.1.0完全分布 ...
首页
Portal
专题
BBS
面试
更多
登录
注册
用户组:游客
主题
帖子
云币
我的帖子
我的收藏
我的好友
我的勋章
设置
退出
导读
淘贴
博客
群组
社区VIP
APP下载
今日排行
本周排行
本周热帖
本月排行
本月热帖
会员排行
About云-梭伦科技
»
专题
›
交流区
›
技术交流
›
Spark
›
疑问解答
›
Spark streaming 拉取mysql数据库表问题
0
1
0
分享
Spark streaming 拉取mysql数据库表问题
xiaobaiyang
2019-6-3 14:00:21
发表于
疑问解答
[显示全部楼层]
阅读模式
关闭右栏
1
4227
最近在验证spark streaming 读取mysql库表的测试。下面是我的代码@Slf4j
public class
StreamingDemo {
private static
JavaStreamingContext ssc =
null
;
public static void
main(String[] args) {
SparkConf sparkConf =
new
SparkConf();
sparkConf.setMaster(
"local[4]"
);
sparkConf.setAppName(
"spark streaming demo"
);
ssc =
new
JavaStreamingContext(sparkConf, Durations.seconds(
60
));
JavaDStream<VehonwayRt> dStream = ssc.queueStream(getQueue());
JavaDStream<VehonwayRt> streams = dStream.window(Durations.seconds(
60
), Durations.seconds(
60
));
streams.count();
streams.foreachRDD(
new
VoidFunction<JavaRDD<VehonwayRt>>() {
@Override
public void
call(JavaRDD<VehonwayRt> vehonwayRtJavaRDD)
throws
Exception {
vehonwayRtJavaRDD.foreachPartition(
new
VoidFunction<Iterator<VehonwayRt>>() {
@Override
public void
call(Iterator<VehonwayRt> vehonwayRtIterator)
throws
Exception {
ITopicAnalyseDao topicAnalyseDao =
new
TopicAnalyseDaoImpl();
List<VehonwayRt> list = IteratorUtils.toList(vehonwayRtIterator);
topicAnalyseDao.insertVehonwayRt(list);
}
});
}
});
ssc.start();
try
{
ssc.awaitTermination();
}
catch
(InterruptedException e) {
e.printStackTrace();
}
}
public static
Queue<JavaRDD<VehonwayRt>> getQueue() {
Queue<JavaRDD<VehonwayRt>> queue =
new
ArrayDeque<>();
List<Object> list = JDBCHelper.getInstance().executeQueryData(SqlsCollection.onway_sql2);
JSONArray jsonArray = JSONArray.parseArray(JSON.toJSONString(list));
List<VehonwayRt> vehonwayRts = jsonArray.toJavaList(VehonwayRt.
class
);
JavaRDD<VehonwayRt> rdd = ssc.sparkContext().parallelize(vehonwayRts);
queue.add(rdd);
System.out.println(
" size ==== "
+ vehonwayRts.size() +
" "
+ queue.size());
return
queue;
}
}
我是定义了getQueue方法,这个方法主要就是从mysql表中拉取数据。然后定义了ssc,间隔时间是60s,ssc执行getQueue生成队列数据流。但是我观察了日志,getQueue在开始执行了一次,后面就没有执行了。我的间隔时间是60s,理论上程序应该每60s就执行一下getQueue 拉取数据吧。
回复
使用道具
举报
提升卡
置顶卡
沉默卡
喧嚣卡
变色卡
千斤顶
显身卡
已有(1)人评论
电梯直达
正序浏览
s060403072
发表于 2019-6-3 16:00:28
看下队列是否一直产生数据,产生多少数据。最好把队列先简化为一个不断产生数据的数据结构。然后在尝试。
回复
使用道具
举报
显身卡
高级模式
B
Color
Image
Link
Quote
Code
Smilies
您需要登录后才可以回帖
登录
|
立即注册
本版积分规则
发表回复
回帖后跳转到最后一页
发表新帖
xiaobaiyang
高级会员
关注
26
主题
365
帖子
17
粉丝
TA的主题
JAVA 线程通信问题请教
2022-3-4
yarn.nodemanager.local-dirs 目录文件是否可以删除
2019-8-26
Spark streaming 拉取mysql数据库表问题
2019-6-3
生成HFile文件报错
2019-4-26
Hadoop2.7 + Hbase2.1 部署
2019-4-16
24小时热文
矩阵分析引论罗家洪(第四版)
互联网大厂年终福利曝光:看看别人家老板怎
像高手一样发言:七种常见工作场景的说话之
携程允许员工春节回乡办公2个月
数据治理实施方案
关闭
推荐
/2
中文版ChatGPT
1.无需魔法 2.提高编程效率 3.提高文档能力
查看 »
新手帮助
新手帮助:注册遇到问题,领取资源,加入铁粉群,不会使用搜索,如何获取积分等
查看 »
意见
反馈