分享

Apache Spark源码走读之4 -- DStream实时流数据处理

本帖最后由 pig2 于 2015-1-6 14:10 编辑
问题导读
1、流数据的特点是什么?
2、数据的存储有是被什么触发的?
3、如何理解StreamingContext构造函数的入参?





Spark Streaming能够对流数据进行近乎实时的速度进行数据处理。采用了不同于一般的流式数据处理模型,该模型使得Spark Streaming有非常高的处理速度,与storm相比拥有更高的吞能力。

本篇简要分析Spark Streaming的处理模型,Spark Streaming系统的初始化过程,以及当接收到外部数据时后续的处理步骤。

系统概述
流数据的特点
与一般的文件(即内容已经固定)型数据源相比,所谓的流数据拥有如下的特点

  • 数据一直处在变化中
  • 数据无法回退
  • 数据一直源源不断的涌进


DStream
如果要用一句话来概括Spark Streaming的处理思路的话,那就是"将连续的数据持久化,离散化,然后进行批量处理"。

让我们来仔细分析一下这么作的原因。

  • 数据持久化 将从网络上接收到的数据先暂时存储下来,为事件处理出错时的事件重演提供可能,
  • 离散化 数据源源不断的涌进,永远没有一个尽头,就像周星驰的喜剧中所说“崇拜之情如黄河之水绵绵不绝,一发而不可收拾”。既然不能穷尽,那么就将其按时间分片。比如采用一分钟为时间间隔,那么在连续的一分钟内收集到的数据集中存储在一起。
  • 批量处理 将持久化下来的数据分批进行处理,处理机制套用之前的RDD模式

DStream可以说是对RDD的又一层封装。如果打开DStream.scala和RDD.scala,可以发现几乎RDD上的所有operation在DStream中都有相应的定义。

作用于DStream上的operation分成两类

  • Transformation
  • Output 表示将输出结果,目前支持的有print, saveAsObjectFiles, saveAsTextFiles, saveAsHadoopFiles



DStreamGraph
有输入就要有输出,如果没有输出,则前面所做的所有动作全部没有意义,那么如何将这些输入和输出绑定起来呢?这个问题的解决就依赖于DStreamGraph,DStreamGraph记录输入的Stream和输出的Stream。
  1.   private val inputStreams = new ArrayBuffer[InputDStream[_]]()
  2.   private val outputStreams = new ArrayBuffer[DStream[_]]()
  3.   var rememberDuration: Duration = null
  4.   var checkpointInProgress = false
复制代码


outputStreams中的元素是在有Output类型的Operation作用于DStream上时自动添加到DStreamGraph中的。
outputStream区别于inputStream一个重要的地方就是会重载generateJob.

初始化流程
1.png



StreamingContext
StreamingContext是Spark Streaming初始化的入口点,主要的功能是根据入参来生成JobScheduler

设定InputStream
如果流数据源来自于socket,则使用socketStream。如果数据源来自于不断变化着的文件,则可使用fileStream

提交运行
  1. StreamingContext.start()
复制代码


数据处理
1.png

以socketStream为例,数据来自于socket。

SocketInputDstream启动一个线程,该线程使用receive函数来接收数据
  1. def receive() {                                                                                                         
  2.     var socket: Socket = null                                                                                             
  3.     try {                                                                                                                  
  4.       logInfo("Connecting to " + host + ":" + port)                                                                        
  5.       socket = new Socket(host, port)                                                                                      
  6.       logInfo("Connected to " + host + ":" + port)                                                                        
  7.       val iterator = bytesToObjects(socket.getInputStream())                                                               
  8.       while(!isStopped && iterator.hasNext) {                                                                              
  9.         store(iterator.next)                                                                                               
  10.       }                                                                                                                    
  11.       logInfo("Stopped receiving")                                                                                         
  12.       restart("Retrying connecting to " + host + ":" + port)                                                               
  13.     } catch {                                                                                                              
  14.       case e: java.net.ConnectException =>                                                                                 
  15.         restart("Error connecting to " + host + ":" + port, e)                                                            
  16.       case t: Throwable =>                                                                                                
  17.         restart("Error receiving data", t)                                                                                 
  18.     } finally {      
  19.    if (socket != null) {                                                                                                
  20.         socket.close()                                                                                                     
  21.         logInfo("Closed socket to " + host + ":" + port)                                                                  
  22.       }                                                                                                                    
  23.     }                                                                                                                     
  24.   }                                                                                                                        
  25. }
复制代码


接收到的数据会被先存储起来,存储最终会调用到BlockManager.scala中的函数,那么BlockManager是如何被传递到StreamingContext的呢?利用SparkEnv传入的,注意StreamingContext构造函数的入参。
搜狗截图14年12月18日2032_2.png



处理定时器
数据的存储有是被socket触发的。那么已经存储的数据被真正的处理又是被什么触发的呢?

记得在初始化StreamingContext的时候,我们指定了一个时间参数,那么用这个参数会构造相应的重复定时器,一旦定时器超时,调用generateJobs函数。
  1. private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds, longTime => eventActor ! GenerateJobs(new Time(longTime)), "JobGenerator")
复制代码


事件处理函数
  1. /** Processes all events */                                                                                             
  2.   private def processEvent(event: JobGeneratorEvent) {                                                                     
  3.     logDebug("Got event " + event)                                                                                         
  4.     event match {                                                                                                         
  5.       case GenerateJobs(time) => generateJobs(time)                                                                        
  6.       case ClearMetadata(time) => clearMetadata(time)                                                                     
  7.       case DoCheckpoint(time) => doCheckpoint(time)                                                                        
  8.       case ClearCheckpointData(time) => clearCheckpointData(time)                                                         
  9.     }                                                                                                                     
  10.   }   
复制代码


generteJobs
  1. private def generateJobs(time: Time) {                                                                                   
  2.     SparkEnv.set(ssc.env)                                                                                                  
  3.     Try(graph.generateJobs(time)) match {                                                                                 
  4.       case Success(jobs) =>                                                                                                
  5.         val receivedBlockInfo = graph.getReceiverInputStreams.map { stream =>                                             
  6.           val streamId = stream.id                                                                                         
  7.           val receivedBlockInfo = stream.getReceivedBlockInfo(time)                                                        
  8.           (streamId, receivedBlockInfo)                                                                                    
  9.         }.toMap                                                                                                            
  10.         jobScheduler.submitJobSet(JobSet(time, jobs, receivedBlockInfo))                                                   
  11.       case Failure(e) =>                                                                                                   
  12.         jobScheduler.reportError("Error generating jobs for time " + time, e)                                             
  13.     }                                                                                                                     
  14.     eventActor ! DoCheckpoint(time)                                                                                       
  15.   }         
复制代码

generateJobs->generateJob一路下去会调用到Job.run,在job.run中调用sc.runJob,在具体调用路径就不一一列出。
  1. private class JobHandler(job: Job) extends Runnable {
  2.     def run() {
  3.       eventActor ! JobStarted(job)
  4.       job.run()
  5.       eventActor ! JobCompleted(job)
  6.     }
  7.   }
复制代码

DStream.generateJob函数中定义了jobFunc,也就是在job.run()中使用到的jobFunc
  1.   private[streaming] def generateJob(time: Time): Option[Job] = {
  2.     getOrCompute(time) match {
  3.       case Some(rdd) => {
  4.         val jobFunc = () => {
  5.           val emptyFunc = { (iterator: Iterator[T]) => {} }
  6.           context.sparkContext.runJob(rdd, emptyFunc)
  7.         }
  8.         Some(new Job(time, jobFunc))
  9.       }
  10.       case None => None
  11.     }
  12.   }
复制代码

在这个流程中,DStreamGraph起到非常关键的作用,非常类似于TridentStorm中的graph.

在generateJob过程中,DStream会通过调用compute函数生成相应的RDD,SparkContext则是将基于RDD的抽象转换成为多个stage,而执行。

StreamingContext中一个重要的转换就是DStream到RDD的转换,而SparkContext中一个重要的转换是RDD到Stage及Task的转换。在这两个不同的抽象类中,要注意其中getOrCompute和compute函数的实现。


相关内容


Apache Spark源码走读之1 -- Spark论文阅读笔记

Apache Spark源码走读之2 -- Job的提交与运行

Apache Spark源码走读之3-- Task运行期之函数调用关系分析


Apache Spark源码走读之5-- DStream处理的容错性分析

Apache Spark源码走读之6-- 存储子系统分析

Apache Spark源码走读之7 -- Standalone部署方式分析

Apache Spark源码走读之8 -- Spark on Yarn

Apache Spark源码走读之9 -- Spark源码编译

Apache Spark源码走读之10 -- 在YARN上运行SparkPi

Apache Spark源码走读之11 -- sql的解析与执行

Apache Spark源码走读之12 -- Hive on Spark运行环境搭建

Apache Spark源码走读之13 -- hiveql on spark实现详解

Apache Spark源码走读之14 -- Graphx实现剖析

Apache Spark源码走读之15 -- Standalone部署模式下的容错性分析

Apache Spark源码走读之16 -- spark repl实现详解

Apache Spark源码走读之17 -- 如何进行代码跟读

Apache Spark源码走读之18 -- 使用Intellij idea调试Spark源码

Apache Spark源码走读之19 -- standalone cluster模式下资源的申请与释放

Apache Spark源码走读之20 -- ShuffleMapTask计算结果的保存与读取

Apache Spark源码走读之21 -- WEB UI和Metrics初始化及数据更新过程分析

Apache Spark源码走读之22 -- 浅谈mllib中线性回归的算法实现

Apache Spark源码走读之23 -- Spark MLLib中拟牛顿法L-BFGS的源码实现

Apache Spark源码走读之24 -- Sort-based Shuffle的设计与实现


本文档转载自:http://www.cnblogs.com/hseagle/p/3673142.html
作者:徽沪一郎。


没找到任何评论,期待你打破沉寂

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条