从源码角度解析SparkStreaming运行流程
问题导读:1. StreamingContext 如何获取数据?
2. receiverTrackerstart 底层如何实现?
3. receiverExecutorstart 底层如何实现?
4. StreamingContext 如何加载数据?
static/image/hrline/3.gif
解决方案:
获取数据
启动了receiverTracker.start去获取数据
jobGenerator启动job去使用数据计算
receiverTracker.start
在这里的receiverinputStreams其实已经实例化了。在inputDStream的时候已经往ssc.graph中添加了实例
实例化ReceiverTrackerActor,它负责RegisterReceiver(注册Receiver)、AddBlock、ReportError(报告错误)、DeregisterReceiver(注销Receiver)等事件的处理。
启动receiverExecutor(实际类是ReceiverLauncher,这名字起得。。),它主要负责启动Receiver,start方法里面调用了startReceivers方法吧。
receiverExecutor.start()
分发receiver到各个work上去,看上面的英文解释
看上面英文注释,看上面英文注释,看上面英文注释,看上面英文注释
分发各个receiver,然后启动receiver来接收数据,然后在receiver里面将数据存储在blockManage里面
加载数据
启动jobGenerator。
第一次启动会调用startFirstTime()
启动一个定时器Timer。定时器每隔一定时间eventActor发送GenerateJobs消息
启动定时器。里面是使用的一个线程,然后线程调用上面的传入的方法,(方法的功能就是,向eventActor发送一个GenerateJobs)。那个!是eventActor的一个方法,就是发送
下面是线程的方法,循环调用了callback(也就是)上面的向eventActor发消息
下面是线程的方法,循环调用了callback(也就是)上面的向eventActor发消息
如果事件触发了,就调用下面对应的方法
1、DStreamGraph生成jobs。
2、从stream那里获取接收到的Block信息。
3、调用submitJobSet方法提交作业。(其实只是作业的完成情况,在 graph.generateJobs(time)里面其实把作业完成了)
4、提交完作业之后,做一个CheckPoint。
先看DStreamGraph是怎么生成的jobs:(不同的DStream调用的outputStrem.generateJob方法不一样。看看DStream里面的union和print等方法。print调用的是ForEachDStream的generateJob。union(UnionDStream)调用的是DStream自带的)
union调用的是:大部分是调用的下面的这个:(DStream原生的方法)
print调用的是:
生成一个RDD。调用重写的compute。我们可以看到下面的
generatedRDDs.get(time).orElse 。如果这个时间time段的rdd已经准备好了。就不用去执行compute了。而是直接取。这个generatedRDDs 是一个hashmap是每个时间段的rdd。
dstream.foreachRDD什么时候会有多个rdd呢?
private var generatedRDDs = new HashMap] ()
返回头看:
总结
到这里就算结束了,最后来个总结吧,图例在下一章补上,这一章只是过程分析:
一个Receiver监控一个ip和port,如果要多机子并行监控的话,肯定每台机子都是一样的,所以这边是一个Receiver的Array,里面是每个Receiver,如果在开始的时候只定义了一个inputstream,那其实是启动一个机子在监控ip和port,如果要多机子并行监控,只能是通过不同的ip或不同的port来创建多个inputstream来实现并行。(源码里面是把Receiver做成一个RDD来分发到不同的机子上的,这个RDD里面的Receiver就是你多个Receiver的集合)
1、可以有多个输入,我们可以通过StreamingContext定义多个输入,比如我们监听多个(host,ip),可以给它们定义各自的处理逻辑和输出,输出方式不仅限于print方法,还可以有别的方法,saveAsTextFiles和saveAsObjectFiles。这块的设计是支持共享StreamingContext的。
(可以从InputDStream源码的 ssc.graph.addInputStream(this))看出,可以有多个输入
所以一个sparkstreaming程序,可以有多个监控输入。只要增加一个生成inputDStream就可以
2、StreamingContext启动了JobScheduler,JobScheduler启动ReceiverTracker和JobGenerator。
(ReceiverTracker用于分发接收器,在work上实时接收数据;JobGenerator启动一个线程(这个线程在driver端)在每隔一段时间就去使用Receiver接收的数据(使用数据的方法是启动一个job,job操作的rdd是从inputDStream的compute里面产生的))
3、ReceiverTracker是通过把Receiver包装成RDD的方式,发送到Executor端运行起来的,Receiver起来之后向ReceiverTracker发送RegisterReceiver消息。
3、Receiver把接收到的数据,通过ReceiverSupervisor保存。
4、ReceiverSupervisorImpl把数据写入到BlockGenerator的一个ArrayBuffer当中。
5、BlockGenerator内部每个一段时间(默认是200毫秒)就把这个ArrayBuffer构造成Block添加到blocksForPushing当中。
6、BlockGenerator的另外一条线程则不断的把加入到blocksForPushing当中的Block写入到BlockManager当中,并向ReceiverTracker发送AddBlock消息。
7、JobGenerator内部有个定时器,定期生成Job,通过DStream的id,把ReceiverTracker接收到的Block信息从BlockManager上抓取下来进行处理,这个间隔时间是我们在实例化StreamingContext的时候传进去的那个时间,在这个例子里面是Seconds(1)。
转自:csdn
作者:Spark的自由牧场
好文章
页:
[1]