分享

spark streaming接收多个tcp源

Fortitude 发表于 2015-12-14 17:01:28 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 4 11551
spark streaming支持一个JavaStreamingContext接收多个socket源吗?我在代码中这样写:
JavaReceiverInputDStream<String> lines = jssc.socketTextStream("10.129.10.101", 9999);
JavaReceiverInputDStream<String> lines2 = jssc.socketTextStream("10.129.10.102", 9999);
lines.union(lines2);

...
这样写执行会报错:
3283 [sparkDriver-akka.actor.default-dispatcher-2] ERROR akka.actor.OneForOneStrategy  -
java.lang.NullPointerException
        at org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:168)
        at org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:168)
        at scala.collection.TraversableOnce$$anonfun$maxBy$1.apply(TraversableOnce.scala:225)
        at scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)
        at scala.collection.IndexedSeqOptimized$class.reduceLeft(IndexedSeqOptimized.scala:68)
        at scala.collection.mutable.ArrayBuffer.reduceLeft(ArrayBuffer.scala:47)
        at scala.collection.TraversableOnce$class.maxBy(TraversableOnce.scala:225)
        at scala.collection.AbstractTraversable.maxBy(Traversable.scala:105)
        at org.apache.spark.streaming.DStreamGraph.getMaxInputStreamRememberDuration(DStreamGraph.scala:168)
        at org.apache.spark.streaming.scheduler.JobGenerator.clearMetadata(JobGenerator.scala:256)
        at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:168)
        at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$start$1$$anon$1$$anonfun$receive$1.applyOrElse(JobGenerator.scala:78)
        at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
        at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$start$1$$anon$1.aroundReceive(JobGenerator.scala:76)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
        at akka.actor.ActorCell.invoke(ActorCell.scala:487)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
        at akka.dispatch.Mailbox.run(Mailbox.scala:220)
        at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)


请指教怎么可以接收多个tcp数据源。

已有(4)人评论

跳转到指定楼层
arsenduan 发表于 2015-12-14 18:07:57
把lines.union(lines2);改成下面
JavaReceiverInputDStream<String> union=lines.union(lines2);如果还不行,说明环境问题了




回复

使用道具 举报

regan 发表于 2015-12-14 18:22:52
join操作会返回一个UnionDStream,我测试过了多数据源没问题的,我使用简单的netcat服务测试。首先我的代码如下:
object StreamingTest {
  def main(args: Array[String]) {
    val conf = new SparkConf().setMaster("spark://192.168.1.12:7077").setAppName("StreamingTest")
     //val conf = new SparkConf().setMaster("local[4]").setAppName("NetworkWordCount")
    val ssc = new StreamingContext(conf, Seconds(60))
    // Create a DStream that will connect to hostname:port, like localhost:9999
    //监控socket端口接收数据
    val socket1 = ssc.socketTextStream("192.168.1.12", 6781)
    val socket2 = ssc.socketTextStream("192.168.1.12",6782)

    val sockets = socket1.union(socket2)

    sockets.print()

    ssc.start()
    ssc.awaitTermination()

  }

}
代码中从192.168.1.12的6781和6782两个端口接收数据,
使用nc -l 192.168.1.12 6781  和 nc -l 192.168.1.12 分别在两个console启动nc
[yun@master ~]$ nc -l 192.168.1.12 6782
aaaaaaaaaaaaaaaaaaaaaaaaaaaa
aaaaaaaaaaaaaaaaaaaaaaaaa


[yun@master bin]$ nc -l 192.168.1.12 6781
bbbbbbbbbbbbbbbbbbbbbbbbbbbbbb
bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb

然后输入内容回车自动输入
sparkStreaming成功接收到数据如下:
-------------------------------------------
Time: 1450065720000 ms
-------------------------------------------
bbbbbbbbbbbbbbbbbbbbbbbbbbbbbb
aaaaaaaaaaaaaaaaaaaaaaaaaaaa
aaaaaaaaaaaaaaaaaaaaaaaaa


回复

使用道具 举报

regan 发表于 2015-12-14 18:26:48
还有就是你报的错“ [sparkDriver-akka.actor.default-dispatcher-2] ERROR akka.actor.OneForOneStrategy  -
java.lang.NullPointerException
        at ”明显时在Driver端报了一个null,你仔细检查一下你的代码中,是否可能有除先null错误的地方,如若不是,请仔细看看你的集群环境是否搭建好
回复

使用道具 举报

Fortitude 发表于 2015-12-14 18:37:19
感谢楼上两位的回答,问题解决了,按照下面这样写就对了。
JavaReceiverInputDStream<String> lines = jssc.socketTextStream("10.129.10.101", 9999);
JavaReceiverInputDStream<String> lines2 = jssc.socketTextStream("10.129.10.102", 9999);
JavaDStream<String> union = lines.union(lines2);
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条