分享

sparkstreaming结果怎么和传入的数据关联上?

dlh 发表于 2015-12-7 14:52:01 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 10 24919
本帖最后由 dlh 于 2015-12-7 15:35 编辑

我用sparkstreaming对数据库里的数据做分析,然后输出分析结果。       我之前测试验证都是用netcat输入查询条件,sparkstreaming根据查询条件从数据库取出结果做分析,然后把结果写文件。可是我怎么知道文件里的结果是根据哪个条件分析出来?

      现在我要做成webservice的方式:调用接口传入参数,接口返回分析结果,我不知道怎么实现这样的方式。

不知道我问题描述清楚了吗?就是跟查询似得,条件结果同步输出。


我的问题说白了就是:用户输入数据,怎么获取对应的结果

已有(10)人评论

跳转到指定楼层
regan 发表于 2015-12-7 18:21:32
在这里你用的数据库是什么?我不是很理解你为什么用sparkStreaming查询数据库????
当然加入你确实有这样的需求,sparkStreaming时在你设置的时间窗口内收集数据,到了时间窗口将会自动将这些收集到的数据提交job,job提交集群计算默认保存的文件名称是-getMilliones,如-1449463527951为文件夹,文件夹中是分区文件,如果没有数据提交那只有一个_SUCCESS标志文件.对于你的第一个问题:条件结果对应,可以这样来实现,我们先参考saveAsTextFiles,有prefix和suffix两个参数,你可以将你传入的条件作为prefix
def saveAsTextFiles(prefix: String, suffix: String = ""): Unit = ssc.withScope {
  val saveFunc = (rdd: RDD[T], time: Time) => {
    val file = rddToFileName(prefix, suffix, time)
    rdd.saveAsTextFile(file)
  }
  this.foreachRDD(saveFunc)
}
这样得到的结果就是:文件夹名称是"条件+后缀",文件夹里面就是你条件对应的结果.

如果你改成webservice来做,比如说,你用Servlet调用或则其他,在servlet中你可以得到你传入的查询条件(什么是Servlet楼主应该知道),然后在servlet中新建socket,将条件发送给sparkStreamingContext,如下
ssc.socketTextStream("192.168.1.12", 6780)
spark收到传入的条件,在RDD的treansaction操作中可以根据条件查询数据库,在将结果保存HDFS,对于你说的返回结果
解决办法是,启动一个线程从HDFS中读去结果文件

回复

使用道具 举报

zeno_wgs 发表于 2015-12-7 20:36:45
基本思路同1楼,处理原则就是sparkstreaming处理数据后生成的文件是在hdfs上按目录存储的(目录下有一个_SUCCESS文件和1个或多个partxxxx文件,其中partxxxx文件就是数据文件),是可以找到与数据传入对应的的数据文件夹的。
另外,不建议通过查询数据库来生成sparkstreaming的输入数据,这样会对业务数据库带来负担。建议采取数据库日志读取工具,直接生成数据库日志文件作为sparkstreaming的数据输入数据,可以以文件方式或者通过socket协议给到sparkstraming,供参考。
回复

使用道具 举报

dlh 发表于 2015-12-8 08:56:59
regan 发表于 2015-12-7 18:21
在这里你用的数据库是什么?我不是很理解你为什么用sparkStreaming查询数据库????
当然加入你确实有这 ...

谢谢你详细的回答,我想在问一下,我昨天想到的两个方法是否可行:
1、自己写socket链接传条件,我用得是hbase,把结果和条件(foreachrdd里应该能得到条件吧,怎么得到我还不知道)写入hbase数据库,自己在根据条件查hbase取结果。
2、用kafka,把条件send给streaming,再foreachrdd里条件和结果也用kafka发出去,然后在从kafka里找结果(感觉这个有点大材小用,而且还得部署维护kafka)
回复

使用道具 举报

dlh 发表于 2015-12-8 08:58:08
zeno_wgs 发表于 2015-12-7 20:36
基本思路同1楼,处理原则就是sparkstreaming处理数据后生成的文件是在hdfs上按目录存储的(目录下有一个_SU ...

非常感谢
回复

使用道具 举报

regan 发表于 2015-12-8 09:17:41
dlh 发表于 2015-12-8 08:56
谢谢你详细的回答,我想在问一下,我昨天想到的两个方法是否可行:
1、自己写socket链接传条件,我用得 ...

如果你真的要用socket传条件给sparkStreaming,在sparkStreaming中是可以得到你传入的条件的,通过方法:
valdstream = ssc.socketTextStream("ip",9999)
可以得到你传入的条件,socketTextStream方法会返回一个DStream,通过DStream的map方法可以遍历你传入的条件
dstream
.map(condition =>{
  //这里可以使用condition条件做你想做的是,如查询数据库等操作,返回结果,怎样操作结果你也可以在这里面写,如保存HBase
})
像你所说再根据条件保存HBASE,完全可以,你只需知道怎么根据你的条件去从hbase中查找出结果即可,但是我觉得直接保存HDFS或者Tachyon中,
其他业务可以通过查找HDFS或则Tachyon共享结果数据。


在此用kafka分布式消息系统,感觉不是很合理,毕竟就一个简单的业务,引入不必要的维护。
以上供参考

回复

使用道具 举报

dlh 发表于 2015-12-8 10:47:38
regan 发表于 2015-12-8 09:17
如果你真的要用socket传条件给sparkStreaming,在sparkStreaming中是可以得到你传入的条件的,通过方法:
...

非常感谢

回复

使用道具 举报

dlh 发表于 2015-12-8 16:22:59
regan 发表于 2015-12-8 09:17
如果你真的要用socket传条件给sparkStreaming,在sparkStreaming中是可以得到你传入的条件的,通过方法:
...

我又遇到了问题:

从 sparkStreaming 接收到参数放到DStream A。我会把A经过很多的转换和行为,得到最终的DStream B,在B里把条件和结果写入hbase。但是B不知道原始条件是什么了。怎么让B知道A里存的原始条件。难道要在每个转换和行为的结果里都加上原始条件,一直传给B?

回复

使用道具 举报

regan 发表于 2015-12-8 18:46:09
本帖最后由 regan 于 2015-12-8 18:48 编辑
dlh 发表于 2015-12-8 16:22
我又遇到了问题:

从 sparkStreaming 接收到参数放到DStream A。我会把A经过很多的转换和行为,得到最 ...

直接传tuple啊,二维三维n维tuple都可以,就像这样A.map(t=>(t,t)).map(t=>(t,t,t))你要的条件的话,你就把条件保存到tuple中从沿着transaction操作传递下去就是了,解决办法千千万,找到解决的办法就好办了
回复

使用道具 举报

dlh 发表于 2015-12-8 19:08:05
regan 发表于 2015-12-8 18:46
直接传tuple啊,二维三维n维tuple都可以,就像这样A.map(t=>(t,t)).map(t=>(t,t,t))你要的条件的话,你就把 ...

但是这样会很影响性能,根据条件查出来的数据能达到好几亿,每条数据都带着条件压缩、传输,会很耗资源和时间的
回复

使用道具 举报

12下一页
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条