levycui 发表于 2015-12-1 14:53:22

spark streaming源码分析: DStream相关API

问题导读
1、如何进行Input DStream创建的操作?
2、如何进行DStream操作?
3、如何进行K/V类型RDD转换操作?

static/image/hrline/3.gif

一、InputDStream创建的操作(StreamingContext.scala)
1、给定Receiver作为参数,创建ReceiverInputDStream,T为receiver接收到的数据类型

    def receiverStream(receiver: Receiver): ReceiverInputDStream = {
      withNamedScope("receiver stream") {
          new PluggableInputDStream(this, receiver)
      }
      }

2、根据参数生成akka actorstream接收数据

    def actorStream(
          props: Props,
          name: String,
          storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2,
          supervisorStrategy: SupervisorStrategy = ActorSupervisorStrategy.defaultStrategy
      ): ReceiverInputDStream = withNamedScope("actor stream") {
      receiverStream(new ActorReceiver(props, name, storageLevel, supervisorStrategy))
      }

3、TCP socket

socketStream:converter是从socket输入流转换成元素T的迭代器的方法

    def socketStream(
          hostname: String,
          port: Int,
          converter: (InputStream) => Iterator,
          storageLevel: StorageLevel
      ): ReceiverInputDStream = {
      new SocketInputDStream(this, hostname, port, converter, storageLevel)
      }

socketTextStream:storageLevel默认是MEMORY_AND_DISK_SER_2,converter是从inputstream中按行读取转换成迭代器的固定方法

    def socketTextStream(
          hostname: String,
          port: Int,
          storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
      ): ReceiverInputDStream = withNamedScope("socket text stream") {
      socketStream(hostname, port, SocketReceiver.bytesToLines, storageLevel)
      }

4、fileStream:filter:文件过滤器,newFileOnly:只读取新的文件。还有其他一些使用默认参数的方法。

    def fileStream[
      K: ClassTag,
      V: ClassTag,
      F <: NewInputFormat: ClassTag
      ] (directory: String,
         filter: Path => Boolean,
         newFilesOnly: Boolean,
         conf: Configuration): InputDStream[(K, V)] = {
      new FileInputDStream(this, directory, filter, newFilesOnly, Option(conf))
      }

一个以固定格式读取文件作为输入的接口

    def textFileStream(directory: String): DStream = withNamedScope("text file stream") {
      fileStream(directory).map(_._2.toString)
      }

与receiverInputDStream不同,它是以文件作为输入,所以不需要receiver去读取。而是直接根据path生成hadoopRDD,再将所有的RDD Union起来。也就是说,在一个batchDuration时间间隔内,就将这个间隔内新的file组合成一个RDD。

5、将多个DStream 联合,返回UnionDStream。compute方法就是将多个DStream中的Rdd union


    /**
       * Create a unified DStream from multiple DStreams of the same type and same slide duration.
       */
      def union(streams: Seq]): DStream = withScope {
      new UnionDStream(streams.toArray)
      }

6、transform:将dstreams中得到的所有rdds转换成一个RDD

    /**
       * Create a new DStream in which each RDD is generated by applying a function on RDDs of
       * the DStreams.
       */
      def transform(
          dstreams: Seq],
          transformFunc: (Seq], Time) => RDD
      ): DStream = withScope {
      new TransformedDStream(dstreams, sparkContext.clean(transformFunc))
      }

二、DStream操作(DStream.scala)

与RDD不同的是,DStream是以一个outputStream作为一个job。

那outputStream是如何产生的呢?在调用foreachRDD方法时通过注册将一个DStream在DStreamGraph中标记为outputStream。

那有哪些API会注册outputStream呢?

foreachRDD/print

saveAsNewAPIHadoopFiles/saveAsTextFiles

1、map/flatMap/filter/mapPartitions

与RDD类似,分别生成MappedDstream/FlatMappedDStream/FilteredDStream等,真正运算时根据receiverInputDStream的compute方法产生BlockRDD,再在这个RDD上赋予map的方法参数执行操作。

2、重新分区

方法最终是将BlockRDD进行重新分区

    /**
       * Return a new DStream with an increased or decreased level of parallelism. Each RDD in the
       * returned DStream has exactly numPartitions partitions.
       */
      def repartition(numPartitions: Int): DStream = ssc.withScope {
      this.transform(_.repartition(numPartitions))
      }

3、reduce:这个方法将DStream的每个RDD都执行reduceFunc方法,并最终每个RDD只有一个分区,返回的还是一个DStream

区别:RDD.scala的reduce方法是提交runJob的,返回一个确切的值。

    /**
       * Return a new DStream in which each RDD has a single element generated by reducing each RDD
       * of this DStream.
       */
      def reduce(reduceFunc: (T, T) => T): DStream = ssc.withScope {
      this.map(x => (null, x)).reduceByKey(reduceFunc, 1).map(_._2)
      }

4、count:这个方法是将DStream中的每个RDD进行计数,返回一个包含技术的DStream

    /**
       * Return a new DStream in which each RDD has a single element generated by counting each RDD
       * of this DStream.
       */
      def count(): DStream = ssc.withScope {
      this.map(_ => (null, 1L))
            .transform(_.union(context.sparkContext.makeRDD(Seq((null, 0L)), 1)))
            .reduceByKey(_ + _)
            .map(_._2)
      }

5、countByValue:类似count方法,只是该方法是按value值计数的

    def countByValue(numPartitions: Int = ssc.sc.defaultParallelism)(implicit ord: Ordering = null)
          : DStream[(T, Long)] = ssc.withScope {
      this.map(x => (x, 1L)).reduceByKey((x: Long, y: Long) => x + y, numPartitions)
      }

6、foreachRDD:foreachFunc是在一个RDD进行自定义的任何操作

    def foreachRDD(foreachFunc: RDD => Unit): Unit = ssc.withScope {
      val cleanedF = context.sparkContext.clean(foreachFunc, false)
      this.foreachRDD((r: RDD, t: Time) => cleanedF(r))
      }


    def foreachRDD(foreachFunc: (RDD, Time) => Unit): Unit = ssc.withScope {
      // because the DStream is reachable from the outer object here, and because
      // DStreams can't be serialized with closures, we can't proactively check
      // it for serializability and so we pass the optional false to SparkContext.clean
      new ForEachDStream(this, context.sparkContext.clean(foreachFunc, false)).register()
      }

7、transform:在最终生成的RDD上执行transformFunc方法定义的转换操作

    def transform(transformFunc: RDD => RDD): DStream


    def transform(transformFunc: (RDD, Time) => RDD): DStream

8、transformWith:将自身DStream生成的RDD与other生成的RDD一起,执行transformWith方法。


    def transformWith(
          other: DStream, transformFunc: (RDD, RDD) => RDD
      ): DStream


    def transformWith(
          other: DStream, transformFunc: (RDD, RDD, Time) => RDD
      ): DStream

9、union联合


    def union(that: DStream): DStream = ssc.withScope {
      new UnionDStream(Array(this, that))
      }

10、saveAsObjectFiles/saveAsTextFiles

保存为文件

三、K/V类型RDD转换操作

1、groupByKey


    def groupByKey(): DStream[(K, Iterable)] = ssc.withScope {
      groupByKey(defaultPartitioner())
      }


    def groupByKey(numPartitions: Int): DStream[(K, Iterable)] = ssc.withScope {
      groupByKey(defaultPartitioner(numPartitions))
      }


    def groupByKey(partitioner: Partitioner): DStream[(K, Iterable)] = ssc.withScope {
      val createCombiner = (v: V) => ArrayBuffer(v)
      val mergeValue = (c: ArrayBuffer, v: V) => (c += v)
      val mergeCombiner = (c1: ArrayBuffer, c2: ArrayBuffer) => (c1 ++ c2)
      combineByKey(createCombiner, mergeValue, mergeCombiner, partitioner)
          .asInstanceOf)]]
      }

2、reduceByKey

    def reduceByKey(reduceFunc: (V, V) => V): DStream[(K, V)]


    def reduceByKey(
          reduceFunc: (V, V) => V,
          numPartitions: Int): DStream[(K, V)]


    def reduceByKey(
          reduceFunc: (V, V) => V,
          partitioner: Partitioner): DStream[(K, V)]

3、combineByKey


    def combineByKey(
          createCombiner: V => C,
          mergeValue: (C, V) => C,
          mergeCombiner: (C, C) => C,
          partitioner: Partitioner,
          mapSideCombine: Boolean = true): DStream[(K, C)] = ssc.withScope {
      val cleanedCreateCombiner = sparkContext.clean(createCombiner)
      val cleanedMergeValue = sparkContext.clean(mergeValue)
      val cleanedMergeCombiner = sparkContext.clean(mergeCombiner)
      new ShuffledDStream(
          self,
          cleanedCreateCombiner,
          cleanedMergeValue,
          cleanedMergeCombiner,
          partitioner,
          mapSideCombine)
      }

4、mapValues/flatMapValues

与RDD的操作类似,不解释

5、join

内部调用transformWith,transformWith的参数就是将两个参数RDD作join操作。


    def join(
          other: DStream[(K, W)],
          partitioner: Partitioner
      ): DStream[(K, (V, W))] = ssc.withScope {
      self.transformWith(
          other,
          (rdd1: RDD[(K, V)], rdd2: RDD[(K, W)]) => rdd1.join(rdd2, partitioner)
      )
      }

6、saveAsNewAPIHadoopFiles

保存到文件。

原文:http://blog.csdn.net/yueqian_zhu/article/details/49121489

页: [1]
查看完整版本: spark streaming源码分析: DStream相关API