本帖最后由 坎蒂丝_Swan 于 2014-12-29 16:09 编辑
问题导读
学习在Spark Streaming下如何自定义Receivers?
自定义一个Receiver
- class SocketTextStreamReceiver(host: String, port: Int(
- extends NetworkReceiver[String]
- {
- protected lazy val blocksGenerator: BlockGenerator =
- new BlockGenerator(StorageLevel.MEMORY_ONLY_SER_2)
-
- protected def onStart() = {
- blocksGenerator.start()
- val socket = new Socket(host, port)
- val dataInputStream = new BufferedReader(new InputStreamReader(socket.getInputStream(), "UTF-8"))
- var data: String = dataInputStream.readLine()
- while (data != null) {
- blocksGenerator += data
- data = dataInputStream.readLine()
- }
- }
-
- protected def onStop() {
- blocksGenerator.stop()
- }
- }
复制代码
An Actor as Receiver- class SocketTextStreamReceiver (host:String,
- port:Int,
- bytesToString: ByteString => String) extends Actor with Receiver {
-
- override def preStart = IOManager(context.system).connect(host, port)
-
- def receive = {
- case IO.Read(socket, bytes) => pushBlock(bytesToString(bytes))
- }
-
- }
复制代码
A Sample Spark Application- val ssc = new StreamingContext(master, "WordCountCustomStreamSource",
- Seconds(batchDuration))
- //使用自定义的receiver
- val lines = ssc.networkStream[String](new SocketTextStreamReceiver(
- "localhost", 8445))
-
- //或者使用这个自定义的actor Receiver
- val lines2 = ssc.actorStream[String](Props(new SocketTextStreamReceiver(
- "localhost",8445, z => z.utf8String)),"SocketReceiver") */
- val words = lines.flatMap(_.split(" "))
- val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
- wordCounts.print()
- ssc.start()
复制代码
提交成功之后,启动Netcat测试一下
- $ nc -l localhost 8445 hello world hello hello
复制代码
下面是合并多个输入流的方法:
- val lines = ssc.actorStream[String](Props(new SocketTextStreamReceiver(
- "localhost",8445, z => z.utf8String)),"SocketReceiver")
-
- // Another socket stream receiver
- val lines2 = ssc.actorStream[String](Props(new SocketTextStreamReceiver(
- "localhost",8446, z => z.utf8String)),"SocketReceiver")
-
- val union = lines.union(lines2)
复制代码
本文转自岑玉海 http://www.cnblogs.com/cenyuhai/p/3577583.html |