在这里你用的数据库是什么?我不是很理解你为什么用sparkStreaming查询数据库????
当然加入你确实有这样的需求,sparkStreaming时在你设置的时间窗口内收集数据,到了时间窗口将会自动将这些收集到的数据提交job,job提交集群计算默认保存的文件名称是-getMilliones,如-1449463527951为文件夹,文件夹中是分区文件,如果没有数据提交那只有一个_SUCCESS标志文件.对于你的第一个问题:条件结果对应,可以这样来实现,我们先参考saveAsTextFiles,有prefix和suffix两个参数,你可以将你传入的条件作为prefix
def saveAsTextFiles(prefix: String, suffix: String = ""): Unit = ssc.withScope {
val saveFunc = (rdd: RDD[T], time: Time) => {
val file = rddToFileName(prefix, suffix, time)
rdd.saveAsTextFile(file)
}
this.foreachRDD(saveFunc)
}
这样得到的结果就是:文件夹名称是"条件+后缀",文件夹里面就是你条件对应的结果.
如果你改成webservice来做,比如说,你用Servlet调用或则其他,在servlet中你可以得到你传入的查询条件(什么是Servlet楼主应该知道),然后在servlet中新建socket,将条件发送给sparkStreamingContext,如下
ssc.socketTextStream("192.168.1.12", 6780)
spark收到传入的条件,在RDD的treansaction操作中可以根据条件查询数据库,在将结果保存HDFS,对于你说的返回结果
解决办法是,启动一个线程从HDFS中读去结果文件
|