问题导读
1、如何进行Input DStream创建的操作?
2、如何进行DStream操作?
3、如何进行K/V类型RDD转换操作?
一、InputDStream创建的操作(StreamingContext.scala)
1、给定Receiver作为参数,创建ReceiverInputDStream,T为receiver接收到的数据类型
[mw_shl_code=applescript,true] def receiverStream[T: ClassTag](receiver: Receiver[T]): ReceiverInputDStream[T] = {
withNamedScope("receiver stream") {
new PluggableInputDStream[T](this, receiver)
}
} [/mw_shl_code]
2、根据参数生成akka actorstream接收数据
[mw_shl_code=applescript,true] def actorStream[T: ClassTag](
props: Props,
name: String,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2,
supervisorStrategy: SupervisorStrategy = ActorSupervisorStrategy.defaultStrategy
): ReceiverInputDStream[T] = withNamedScope("actor stream") {
receiverStream(new ActorReceiver[T](props, name, storageLevel, supervisorStrategy))
} [/mw_shl_code]
3、TCP socket
socketStream:converter是从socket输入流转换成元素T的迭代器的方法
[mw_shl_code=applescript,true] def socketStream[T: ClassTag](
hostname: String,
port: Int,
converter: (InputStream) => Iterator[T],
storageLevel: StorageLevel
): ReceiverInputDStream[T] = {
new SocketInputDStream[T](this, hostname, port, converter, storageLevel)
} [/mw_shl_code]
socketTextStream:storageLevel默认是MEMORY_AND_DISK_SER_2,converter是从inputstream中按行读取转换成迭代器的固定方法
[mw_shl_code=applescript,true] def socketTextStream(
hostname: String,
port: Int,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): ReceiverInputDStream[String] = withNamedScope("socket text stream") {
socketStream[String](hostname, port, SocketReceiver.bytesToLines, storageLevel)
} [/mw_shl_code]
4、fileStream:filter:文件过滤器,newFileOnly:只读取新的文件。还有其他一些使用默认参数的方法。
[mw_shl_code=applescript,true] def fileStream[
K: ClassTag,
V: ClassTag,
F <: NewInputFormat[K, V]: ClassTag
] (directory: String,
filter: Path => Boolean,
newFilesOnly: Boolean,
conf: Configuration): InputDStream[(K, V)] = {
new FileInputDStream[K, V, F](this, directory, filter, newFilesOnly, Option(conf))
} [/mw_shl_code]
一个以固定格式读取文件作为输入的接口
[mw_shl_code=applescript,true] def textFileStream(directory: String): DStream[String] = withNamedScope("text file stream") {
fileStream[LongWritable, Text, TextInputFormat](directory).map(_._2.toString)
} [/mw_shl_code]
与receiverInputDStream不同,它是以文件作为输入,所以不需要receiver去读取。而是直接根据path生成hadoopRDD,再将所有的RDD Union起来。也就是说,在一个batchDuration时间间隔内,就将这个间隔内新的file组合成一个RDD。
5、将多个DStream 联合,返回UnionDStream。compute方法就是将多个DStream中的Rdd union
[mw_shl_code=applescript,true]
/**
* Create a unified DStream from multiple DStreams of the same type and same slide duration.
*/
def union[T: ClassTag](streams: Seq[DStream[T]]): DStream[T] = withScope {
new UnionDStream[T](streams.toArray)
} [/mw_shl_code]
6、transform:将dstreams中得到的所有rdds转换成一个RDD
[mw_shl_code=applescript,true] /**
* Create a new DStream in which each RDD is generated by applying a function on RDDs of
* the DStreams.
*/
def transform[T: ClassTag](
dstreams: Seq[DStream[_]],
transformFunc: (Seq[RDD[_]], Time) => RDD[T]
): DStream[T] = withScope {
new TransformedDStream[T](dstreams, sparkContext.clean(transformFunc))
} [/mw_shl_code]
二、DStream操作(DStream.scala)
与RDD不同的是,DStream是以一个outputStream作为一个job。
那outputStream是如何产生的呢?在调用foreachRDD方法时通过注册将一个DStream在DStreamGraph中标记为outputStream。
那有哪些API会注册outputStream呢?
foreachRDD/print
saveAsNewAPIHadoopFiles/saveAsTextFiles
1、map/flatMap/filter/mapPartitions
与RDD类似,分别生成MappedDstream/FlatMappedDStream/FilteredDStream等,真正运算时根据receiverInputDStream的compute方法产生BlockRDD,再在这个RDD上赋予map的方法参数执行操作。
2、重新分区
方法最终是将BlockRDD进行重新分区
[mw_shl_code=applescript,true] /**
* Return a new DStream with an increased or decreased level of parallelism. Each RDD in the
* returned DStream has exactly numPartitions partitions.
*/
def repartition(numPartitions: Int): DStream[T] = ssc.withScope {
this.transform(_.repartition(numPartitions))
} [/mw_shl_code]
3、reduce:这个方法将DStream的每个RDD都执行reduceFunc方法,并最终每个RDD只有一个分区,返回的还是一个DStream[T]
区别:RDD.scala的reduce方法是提交runJob的,返回一个确切的值。
[mw_shl_code=applescript,true] /**
* Return a new DStream in which each RDD has a single element generated by reducing each RDD
* of this DStream.
*/
def reduce(reduceFunc: (T, T) => T): DStream[T] = ssc.withScope {
this.map(x => (null, x)).reduceByKey(reduceFunc, 1).map(_._2)
} [/mw_shl_code]
4、count:这个方法是将DStream中的每个RDD进行计数,返回一个包含技术的DStream
[mw_shl_code=applescript,true] /**
* Return a new DStream in which each RDD has a single element generated by counting each RDD
* of this DStream.
*/
def count(): DStream[Long] = ssc.withScope {
this.map(_ => (null, 1L))
.transform(_.union(context.sparkContext.makeRDD(Seq((null, 0L)), 1)))
.reduceByKey(_ + _)
.map(_._2)
} [/mw_shl_code]
5、countByValue:类似count方法,只是该方法是按value值计数的
[mw_shl_code=applescript,true] def countByValue(numPartitions: Int = ssc.sc.defaultParallelism)(implicit ord: Ordering[T] = null)
: DStream[(T, Long)] = ssc.withScope {
this.map(x => (x, 1L)).reduceByKey((x: Long, y: Long) => x + y, numPartitions)
}
[/mw_shl_code]
6、foreachRDD:foreachFunc是在一个RDD进行自定义的任何操作
[mw_shl_code=applescript,true] def foreachRDD(foreachFunc: RDD[T] => Unit): Unit = ssc.withScope {
val cleanedF = context.sparkContext.clean(foreachFunc, false)
this.foreachRDD((r: RDD[T], t: Time) => cleanedF(r))
} [/mw_shl_code]
[mw_shl_code=applescript,true] def foreachRDD(foreachFunc: (RDD[T], Time) => Unit): Unit = ssc.withScope {
// because the DStream is reachable from the outer object here, and because
// DStreams can't be serialized with closures, we can't proactively check
// it for serializability and so we pass the optional false to SparkContext.clean
new ForEachDStream(this, context.sparkContext.clean(foreachFunc, false)).register()
} [/mw_shl_code]
7、transform:在最终生成的RDD上执行transformFunc方法定义的转换操作
[mw_shl_code=applescript,true] def transform[U: ClassTag](transformFunc: RDD[T] => RDD[U]): DStream[U]
def transform[U: ClassTag](transformFunc: (RDD[T], Time) => RDD[U]): DStream[U] [/mw_shl_code]
8、transformWith:将自身DStream生成的RDD与other生成的RDD一起,执行transformWith方法。
[mw_shl_code=applescript,true] def transformWith[U: ClassTag, V: ClassTag](
other: DStream[U], transformFunc: (RDD[T], RDD[U]) => RDD[V]
): DStream[V]
def transformWith[U: ClassTag, V: ClassTag](
other: DStream[U], transformFunc: (RDD[T], RDD[U], Time) => RDD[V]
): DStream[V] [/mw_shl_code]
9、union联合
[mw_shl_code=applescript,true] def union(that: DStream[T]): DStream[T] = ssc.withScope {
new UnionDStream[T](Array(this, that))
} [/mw_shl_code]
10、saveAsObjectFiles/saveAsTextFiles
保存为文件
三、K/V类型RDD转换操作
1、groupByKey
[mw_shl_code=applescript,true] def groupByKey(): DStream[(K, Iterable[V])] = ssc.withScope {
groupByKey(defaultPartitioner())
}
def groupByKey(numPartitions: Int): DStream[(K, Iterable[V])] = ssc.withScope {
groupByKey(defaultPartitioner(numPartitions))
} [/mw_shl_code]
[mw_shl_code=applescript,true] def groupByKey(partitioner: Partitioner): DStream[(K, Iterable[V])] = ssc.withScope {
val createCombiner = (v: V) => ArrayBuffer[V](v)
val mergeValue = (c: ArrayBuffer[V], v: V) => (c += v)
val mergeCombiner = (c1: ArrayBuffer[V], c2: ArrayBuffer[V]) => (c1 ++ c2)
combineByKey(createCombiner, mergeValue, mergeCombiner, partitioner)
.asInstanceOf[DStream[(K, Iterable[V])]]
}
[/mw_shl_code]
2、reduceByKey
[mw_shl_code=applescript,true] def reduceByKey(reduceFunc: (V, V) => V): DStream[(K, V)]
def reduceByKey(
reduceFunc: (V, V) => V,
numPartitions: Int): DStream[(K, V)]
def reduceByKey(
reduceFunc: (V, V) => V,
partitioner: Partitioner): DStream[(K, V)] [/mw_shl_code]
3、combineByKey
[mw_shl_code=applescript,true] def combineByKey[C: ClassTag](
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiner: (C, C) => C,
partitioner: Partitioner,
mapSideCombine: Boolean = true): DStream[(K, C)] = ssc.withScope {
val cleanedCreateCombiner = sparkContext.clean(createCombiner)
val cleanedMergeValue = sparkContext.clean(mergeValue)
val cleanedMergeCombiner = sparkContext.clean(mergeCombiner)
new ShuffledDStream[K, V, C](
self,
cleanedCreateCombiner,
cleanedMergeValue,
cleanedMergeCombiner,
partitioner,
mapSideCombine)
} [/mw_shl_code]
4、mapValues/flatMapValues
与RDD的操作类似,不解释
5、join
内部调用transformWith,transformWith的参数就是将两个参数RDD作join操作。
[mw_shl_code=applescript,true] def join[W: ClassTag](
other: DStream[(K, W)],
partitioner: Partitioner
): DStream[(K, (V, W))] = ssc.withScope {
self.transformWith(
other,
(rdd1: RDD[(K, V)], rdd2: RDD[(K, W)]) => rdd1.join(rdd2, partitioner)
)
} [/mw_shl_code]
6、saveAsNewAPIHadoopFiles
保存到文件。
原文:http://blog.csdn.net/yueqian_zhu/article/details/49121489
|
|