join操作会返回一个UnionDStream,我测试过了多数据源没问题的,我使用简单的netcat服务测试。首先我的代码如下:
object StreamingTest {
def main(args: Array[String]) {
val conf = new SparkConf().setMaster("spark://192.168.1.12:7077").setAppName("StreamingTest")
//val conf = new SparkConf().setMaster("local[4]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds(60))
// Create a DStream that will connect to hostname:port, like localhost:9999
//监控socket端口接收数据
val socket1 = ssc.socketTextStream("192.168.1.12", 6781)
val socket2 = ssc.socketTextStream("192.168.1.12",6782)
val sockets = socket1.union(socket2)
sockets.print()
ssc.start()
ssc.awaitTermination()
}
} | 代码中从192.168.1.12的6781和6782两个端口接收数据,
使用nc -l 192.168.1.12 6781 和 nc -l 192.168.1.12 分别在两个console启动nc
[yun@master ~]$ nc -l 192.168.1.12 6782
aaaaaaaaaaaaaaaaaaaaaaaaaaaa
aaaaaaaaaaaaaaaaaaaaaaaaa
|
[yun@master bin]$ nc -l 192.168.1.12 6781
bbbbbbbbbbbbbbbbbbbbbbbbbbbbbb
bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb
| 然后输入内容回车自动输入
sparkStreaming成功接收到数据如下:
-------------------------------------------
Time: 1450065720000 ms
-------------------------------------------
bbbbbbbbbbbbbbbbbbbbbbbbbbbbbb
aaaaaaaaaaaaaaaaaaaaaaaaaaaa
aaaaaaaaaaaaaaaaaaaaaaaaa
|
|