分享

Spark(1.2.0) Streaming 集成 Kafka 总结

问题导读
1.spark中什么是stage?
2.什么是 kafka topic?
3.本文中kafka和消费者线程遵循哪些约束?





最近在做利用Spark streaming和Kafka进行数据分析的研究, 整理一些相应的开发文档, 做了一些代码实践。 本文特意将这些资料记录下来。
本文最后列出了一些参考的文档,实际调研中参考了很多的资料,并没有完全将它们记录下来, 只列出了主要的一些参考资料。 当前的版本:
  • Spark: 1.2.0
  • Kafka: 0.8.1.1
Spark Streaming属于Spark的核心api,它支持高吞吐量、支持容错的实时流数据处理。 有以下特点:
  • 易于使用 提供了和批处理一致的高级操作API,可以进行map, reduce, join, window。
  • 容错 Spark Streaming可以恢复你计算的状态, 包括lost work和operator state (比如 sliding windows)。 支持worker节点和driver 节点恢复。
  • Spark集成 可以结合批处理流和交互式查询。 可以重用批处理的代码。还可以直接使用内置的机器学习算法、图算法包来处理数据。 它可以接受来自文件系统, Akka actors, rsKafka, Flume, Twitter, ZeroMQ和TCP Socket的数据源或者你自己定义的输入源。


streaming-arch1.png

它的工作流程像下面的图所示一样,接受到实时数据后,给数据分批次,然后传给Spark Engine处理最后生成该批次的结果流。

streaming-flow2.png

Spark Streaming提供了一个高级的抽象模型,叫做discretized stream或者叫做DStream,它代表了一个持续的数据流。DStream既可以从Kafka, Flume, 和 Kinesis中产生, 或者在其它DStream上应用高级操作得到。 内部实现上一个DStream代表一个RDD序列。
一个简单例子
在我们开始进入编写我们自己的Spark Streaming程序细节之前, 让我们先快速的看一个简单的Sparking Streaming程序是什么样子的。 这个程序接收网络发过来的文本数据,让我们统计一下文本中单词的数量。 全部代码如下:

首先, 我们导入Spark Streaming类名以及StreamingContext的一些隐式转换到我们的环境中, 这样可以为我们需要的类(比如DStream)增加一些有用的方法。. StreamingContext是所有功能的主入口。 我们创建了一个本地StreamingContext, 它使用两个线程, 批处理间隔为1秒.

Scala代码
  1. import org.apache.spark._
  2. import org.apache.spark.streaming._
  3. import org.apache.spark.streaming.StreamingContext._
  4. // Create a local StreamingContext with two working thread and batch interval of 1 second.
  5. // The master requires 2 cores to prevent from a starvation scenario.
  6. val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
  7. val ssc = new StreamingContext(conf, Seconds(1))
复制代码

Java代码
  1. import org.apache.spark.*;
  2. import org.apache.spark.api.java.function.*;
  3. import org.apache.spark.streaming.*;
  4. import org.apache.spark.streaming.api.java.*;
  5. import scala.Tuple2;
  6. // Create a local StreamingContext with two working thread and batch interval of 1 second
  7. SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
  8. JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1))
复制代码


使用这个context, 我们可以创建一个DStream, 代表来自TCP源的流数据。需要指定主机名和端口(如 localhost 和 9999).
Scala代码
  1. // Create a DStream that will connect to hostname:port, like localhost:9999
  2. val lines = ssc.socketTextStream("localhost", 9999)
复制代码

Java代码
  1. // Create a DStream that will connect to hostname:port, like localhost:9999
  2. JavaReceiverInputDStream<String> lines = jssc.socketTextStream("localhost", 9999);
复制代码

这一行代表从数据服务器接受到的数据流. DStream中每条记录是一行文本. 接下来, 我们想使用空格分隔每一行,这样就可以得到文本中的单词。
Scala代码
  1. // Split each line into words
  2. val words = lines.flatMap(_.split(" "))
复制代码
Java代码
  1. // Split each line into words
  2. JavaDStream<String> words = lines.flatMap(
  3.   new FlatMapFunction<String, String>() {
  4.     @Override public Iterable<String> call(String x) {
  5.       return Arrays.asList(x.split(" "));
  6.     }
  7.   });
复制代码


flatMap 是一个一对多的DStream操作, 它从源DStream中的每一个Record产生多个Record, 这些新产生的Record组成了一个新的DStream。 在我们的例子中, 每一行文本被分成了多个单词, 结果得到单词流DStream. 下一步, 我们想统计以下单词的数量.

Scala代码
  1. import org.apache.spark.streaming.StreamingContext._
  2. // Count each word in each batch
  3. val pairs = words.map(word => (word, 1))
  4. val wordCounts = pairs.reduceByKey(_ + _)
  5. // Print the first ten elements of each RDD generated in this DStream to the console
  6. wordCounts.print()
复制代码

Java代码
  1. // Count each word in each batch
  2. JavaPairDStream<String, Integer> pairs = words.map(
  3.   new PairFunction<String, String, Integer>() {
  4.     @Override public Tuple2<String, Integer> call(String s) throws Exception {
  5.       return new Tuple2<String, Integer>(s, 1);
  6.     }
  7.   });
  8. JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey(
  9.   new Function2<Integer, Integer, Integer>() {
  10.     @Override public Integer call(Integer i1, Integer i2) throws Exception {
  11.       return i1 + i2;
  12.     }
  13.   });
  14. // Print the first ten elements of each RDD generated in this DStream to the console
  15. wordCounts.print();
复制代码

单词DStream 被mapped (one-to-one transformation) 成*(word, 1)对*的DStream ,然后reduced 得到每一批单词的频度. 最后, wordCounts.print()会打印出每一秒产生的一些单词的统计值。 注意当这些行执行时,Spark Streaming仅仅设置这些计算, 它并没有马上被执行。 当所有的计算设置完后,我们可以调用下面的代码启动处理

Scala代码
  1. ssc.start()             // Start the computation
  2. ssc.awaitTermination()  // Wait for the computation to terminate
复制代码
Java代码
  1. jssc.start();              // Start the computation
  2. jssc.awaitTermination();   // Wait for the computation to terminate
复制代码

完整的代码可以在例子 [NetworkWordCount](https://github.com/apache/spark/ ... workWordCount.scala) 中找到. 如果你已经下载并编译了Spark, 你可以按照下面的命令运行例子. 你要先运行Netcat工具作为数据服务器

Scala代码
  1. $ nc -lk 9999
复制代码
Java代码
  1. $ nc -lk 9999
复制代码


然后, 在另一个终端中, 你可以启动例子
Scala代码
  1. $ ./bin/run-example streaming.NetworkWordCount localhost 9999
复制代码
Java代码
  1. $ ./bin/run-example streaming.JavaNetworkWordCount localhost 9999
复制代码

然后, 在netcat服务器中输入的每一行都会被统计,然后统计结果被输出到屏幕上。 类似下面的输出

Scala代码
  1. # TERMINAL 2: RUNNING NetworkWordCount
  2. $ ./bin/run-example streaming.NetworkWordCount localhost 9999
  3. ...
  4. -------------------------------------------
  5. Time: 1357008430000 ms
  6. -------------------------------------------
  7. (hello,1)
  8. (world,1)
  9. ...
复制代码
Java代码
  1. # TERMINAL 2: RUNNING JavaNetworkWordCount
  2. $ ./bin/run-example streaming.JavaNetworkWordCount localhost 9999
  3. ...
  4. -------------------------------------------
  5. Time: 1357008430000 ms
  6. -------------------------------------------
  7. (hello,1)
  8. (world,1)
  9. ...
复制代码


核心概念
本节介绍一些Spark和Kafka的概念  Spark cluster: 一个Spark集群至少包含一个worker节点。

cluster-overview3.png
worker node: 一个工作节点可以执行一个或者多个executor.
executor: executor就是一个进程, 负责启在一个worker节点上启动应用,运行task执行计算,存储数据到内存或者磁盘上。 每个Spark应用都有自己的executor。一个executor拥有一定数量的cores, 也被叫做“slots”, 可以执行指派给它的task。
job: 一个并行的计算单元,包含多个task。 在执行Spark action (比如 save, collect)产生; 在log中可以看到这个词。
task: 一个task就是一个工作单元, 可以发送给一个executor执行。 它执行你的应用的实际计算的部分工作。 每个task占用父executor的一个slot (core)。
stage: 每个job都被分隔成多个彼此依赖称之为stage的task(类似MapReduce中的map 和 reduce stage);
共享变量: 普通可序列化的变量复制到远程各个节点。在远程节点上的更新并不会返回到原始节点。因为我们需要共享变量。 Spark提供了两种类型的共享变量。
Scala代码
  1. * Broadcast 变量。  SparkContext.broadcast(v)通过创建, **只读**。
  2. * Accumulator: 累加器,通过SparkContext.accumulator(v)创建,在任务中只能调用add或者+操作,不能读取值。只有驱动程序才可以读取值。
复制代码
Java代码
  1. * Broadcast 变量。  SparkContext.broadcast(v)通过创建, **只读**。
  2. * Accumulator: 累加器,通过SparkContext.accumulator(v)创建,在任务中只能调用add或者+操作,不能读取值。只有驱动程序才可以读取值。
复制代码

receiver: receiver长时间(可能7*24小时)运行在executor。 每个receiver负责一个 input DStream (例如 一个 读取Kafka消息的input stream)。 每个receiver, 加上input DStream会占用一个core/slot.
input DStream: 一个input DStream是一个特殊的DStream, 将Spark Streaming连接到一个外部数据源来读取数据。
kafka topic: topic是发布消息发布的category 或者 feed名. 对于每个topic, Kafka管理一个分区的log,如下图所示:

log_anatomy4.png

分区内的消息都是有序不可变的。
kafka partition: partitions的设计目的有多个.最根本原因是kafka基于文件存储.通过分区,可以将日志内容分散到多个server上,来避免文件尺寸达到单机磁盘的上限,每个partiton都会被当前server(kafka实例)保存;可以将一个topic切分多任意多个partitions(备注:基于sharding),来消息保存/消费的效率.此外越多的partitions意味着可以容纳更多的consumer,有效提升并发消费的能力.
kafka consumer group: 在kafka中,每个消费者要标记自己在那个组中。 如果所有的消费者都在同一个组中,则类似传统的queue消息模式,消息只发给一个消费者。 如果消费者都在不同的组中, 则类似发布-订阅消息模式。 每个消费者都会得到所有的消息。 最通用的模式是混用这两种模式,如下图: consumer-groups5.png
关于kafka和消费者线程, 遵循下面的约束: 如果你的消费者读取包含10个分区的 test的topic,
  • 如果你配置你的消费者只使用1个线程, 则它负责读取十个分区
  • 如果你配置你的消费者只使用5个线程, 则每个线程负责读取2个分区
  • 如果你配置你的消费者只使用10个线程, 则每个线程负责读取1个分区
  • 如果你配置你的消费者只使用14个线程, 则10个线程各负责读取1个分区,4个空闲
  • 如果你配置你的消费者只使用8个线程, 则6个线程个负责一个分区,2个线程各负责2个分区
从Kafka并行读取
有几种方法可以并行的读取Kafka的消息。
Spark的KafkaInputDStream (也叫做Kafka “connector”)使用 Kafka high-level consumer API读取数据,所以有两种方式可以并行的读取数据。
  • 多个input DStream: Spark为每个input dstream运行一个receiver. 这意味着多个input dstream可以运行在多个core上并行读取。 如果它们使用相同的topic,则相当于一个load balancer, 一个时间点上只有一个receiver读取。 如果不同的topic,可以同时读取。
Scala代码
  1. val ssc: StreamingContext = ??? // ignore for now
  2. val kafkaParams: Map[String, String] = Map("group.id" -> "test", /* ignore rest */)
  3. val numInputDStreams = 5
  4. val kafkaDStreams = (1 to numInputDStreams).map { _ => KafkaUtils.createStream(...) }
复制代码
Java代码
  1. val ssc: StreamingContext = ??? // ignore for now
  2. val kafkaParams: Map[String, String] = Map("group.id" -> "test", /* ignore rest */)
  3. val numInputDStreams = 5
  4. val kafkaDStreams = (1 to numInputDStreams).map { _ => KafkaUtils.createStream(...) }
复制代码




每个input dstream的消费者线程数。 同一个receiver可以运行多个线程。 可以配置和分区相同的线程。Scala代码
  1. val ssc: StreamingContext = ??? // ignore for now
  2. val kafkaParams: Map[String, String] = Map("group.id" -> "terran", ...)
  3. val consumerThreadsPerInputDstream = 3
  4. val topics = Map("test" -> consumerThreadsPerInputDstream)
  5. val stream = KafkaUtils.createStream(ssc, kafkaParams, topics, ...)
复制代码
Java代码
  1. val ssc: StreamingContext = ??? // ignore for now
  2. val kafkaParams: Map[String, String] = Map("group.id" -> "terran", ...)
  3. val consumerThreadsPerInputDstream = 3
  4. val topics = Map("test" -> consumerThreadsPerInputDstream)
  5. val stream = KafkaUtils.createStream(ssc, kafkaParams, topics, ...)
复制代码

或者你还可以混合这两种情况:

Scala代码
  1. <font size="2" style="font-weight: normal;">val ssc: StreamingContext = ???
  2. val kafkaParams: Map[String, String] = Map("group.id" -> "terran", ...)
  3. val numDStreams = 5
  4. val topics = Map("zerg.hydra" -> 1)
  5. val kafkaDStreams = (1 to numDStreams).map { _ =>
  6.     KafkaUtils.createStream(ssc, kafkaParams, topics, ...)
  7.   }</font>
复制代码
Java代码
  1. <font size="2" style="font-weight: normal;">val ssc: StreamingContext = ???
  2. val kafkaParams: Map[String, String] = Map("group.id" -> "terran", ...)
  3. val numDStreams = 5
  4. val topics = Map("zerg.hydra" -> 1)
  5. val kafkaDStreams = (1 to numDStreams).map { _ =>
  6.     KafkaUtils.createStream(ssc, kafkaParams, topics, ...)
  7.   }</font>
复制代码

Spark并行处理
上面介绍了Kafka的并行化读取的控制,在Spark中我们可以进行并行化处理。类似Kafka,Spark将parallelism设置的与(RDD)分区数量有关, 通过在每个RDD分区上运行task进行。在有些文档中,分区仍然被称为“slices”。 同样两个控制手段:
  • input DStreams的数量
  • DStream transformation的重分配(repartition): 这里将获得一个全新的DStream,其parallelism等级可能增加、减少,或者保持原样。在DStream中每个返回的RDD都有指定的N个分区。DStream由一系列的RDD组成,DStream.repartition则是通过RDD.repartition实现。 因此,repartition是从processing parallelism分隔read parallelism的主要途径。在这里,我们可以设置processing tasks的数量,也就是说设置处理过程中所有core的数量。间接上,我们同样设置了投入machines/NICs的数量。
一个DStream转换相关是 union。这个方法同样在StreamingContext中,它将从多个DStream中返回一个统一的DStream,它将拥有相同的类型和滑动时间。union会将多个 DStreams压缩到一个 DStreams或者RDD中,但是需要注意的是,这里的parallelism并不会发生改变。
你的用例将决定你如何分区。如果你的用例是CPU密集型的,你希望对test topic进行5 read parallelism读取。也就是说,每个消费者进程使用5个receiver,但是却可以将processing parallelism提升到20。

Scala代码
  1. val ssc: StreamingContext = ???
  2. val kafkaParams: Map[String, String] = Map("group.id" -> "terran", ...)
  3. val readParallelism = 5
  4. val topics = Map("test" -> 1)
  5. val kafkaDStreams = (1 to readParallelism).map { _ =>
  6.     KafkaUtils.createStream(ssc, kafkaParams, topics, ...)
  7.   }
  8. //> collection of five *input* DStreams = handled by five receivers/tasks
  9. val unionDStream = ssc.union(kafkaDStreams) // often unnecessary, just showcasing how to do it
  10. //> single DStream
  11. val processingParallelism = 20
  12. val processingDStream = unionDStream(processingParallelism)
  13. //> single DStream but now with 20 partitions
复制代码

Java代码
  1. val ssc: StreamingContext = ???
  2. val kafkaParams: Map[String, String] = Map("group.id" -> "terran", ...)
  3. val readParallelism = 5
  4. val topics = Map("test" -> 1)
  5. val kafkaDStreams = (1 to readParallelism).map { _ =>
  6.     KafkaUtils.createStream(ssc, kafkaParams, topics, ...)
  7.   }
  8. //> collection of five *input* DStreams = handled by five receivers/tasks
  9. val unionDStream = ssc.union(kafkaDStreams) // often unnecessary, just showcasing how to do it
  10. //> single DStream
  11. val processingParallelism = 20
  12. val processingDStream = unionDStream(processingParallelism)
  13. //> single DStream but now with 20 partitions
复制代码

注意事项
在对Kafka进行读写上仍然存在一些含糊不清的问题,你可以在类似 Multiple Kafka Receivers and UnionHow to scale more consumer to Kafka stream mailing list的讨论中发现。
  • Spark 1.1并不会恢复那些已经接收却没有进行处理的原始数据(查看)。因此,在某些情况下,你的Spark可能会丢失数据。Tathagata Das指出驱动恢复问题会在Spark的1.2版本中解决,现在已经提供Reliable Receiver 和Unreliable Receiver两种Receiver。
  • 1.1版本中的Kafka连接器是基于Kafka的高级消费者API。这样就会造成一个问题,Spark Streaming不可以依赖其自身的KafkaInputDStream将数据从Kafka中重新发送,从而无法解决下游数据丢失问题(比如Spark服务器发生故障)。 Dibyendu Bhattacharya 实现了使用简单消费者API: kafka-spark-consumer.
  • 使用最新的Spark和Kafka,一些bugs已经在最新的Spark和Kafka中修复。
  • 在使用window操作时,window duration和sliding duration必须是DStream批处理的duration的整数倍。
  • 如果分配给应用的core的数量小于或者等于input DStream/receiver数量,则系统只接收数据, 没有额外的core处理数据
  • 接上一条, 你在本地进行测试的时候,如果将master URL设置为“local”的话,则只有一个core运行任务,这明显违反上一条, 只能接收数据,无法处理。
  • Kafak Topic 的分区和 Spark RDD的分区没有任何关系。 它俩是分别设置的。

容错
有两种情况的机器失败。
worker节点失败
receiver接收到的消息在集群间有备份。如果只是一个节点失败, Spark可以恢复。 但是如果是receiver所在的那个节点失败,可能会有一点点数据丢失。 但是Receiver可以在其它节点上恢复启动,继续接收数据。
driver节点失败
如果7*24工作的应用, 如果driver节点失败,Spark Streaming也可以恢复。 Spark streaming定期的把元数据写到HDFS中。 你需要设置checkpoint 文件夹。 为了支持恢复,必须遵循下面的处理:
  • 当应用首次启动时, 它会创建一个新的StreamingContext, 设置所有的流,然后启动start().
  • 当应用因失败而恢复时, 它会从checkpoint文件中的checkpoint重建StreamingContext.
就像这样:
Scala代码
  1. // Function to create and setup a new StreamingContext
  2. def functionToCreateContext(): StreamingContext = {
  3.     val ssc = new StreamingContext(...)   // new context
  4.     val lines = ssc.socketTextStream(...) // create DStreams
  5.     ...
  6.     ssc.checkpoint(checkpointDirectory)   // set checkpoint directory
  7.     ssc
  8. }
  9. // Get StreamingContext from checkpoint data or create a new one
  10. val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)
  11. // Do additional setup on context that needs to be done,
  12. // irrespective of whether it is being started or restarted
  13. context. ...
  14. // Start the context
  15. context.start()
  16. context.awaitTermination()
复制代码
Java代码
  1. // Create a factory object that can create a and setup a new JavaStreamingContext
  2. JavaStreamingContextFactory contextFactory = new JavaStreamingContextFactory() {
  3.   @Override public JavaStreamingContext create() {
  4.     JavaStreamingContext jssc = new JavaStreamingContext(...);  // new context
  5.     JavaDStream<String> lines = jssc.socketTextStream(...);     // create DStreams
  6.     ...
  7.     jssc.checkpoint(checkpointDirectory);                       // set checkpoint directory
  8.     return jssc;
  9.   }
  10. };
  11. // Get JavaStreamingContext from checkpoint data or create a new one
  12. JavaStreamingContext context = JavaStreamingContext.getOrCreate(checkpointDirectory, contextFactory);
  13. // Do additional setup on context that needs to be done,
  14. // irrespective of whether it is being started or restarted
  15. context. ...
  16. // Start the context
  17. context.start();
  18. context.awaitTermination();
复制代码






已有(5)人评论

跳转到指定楼层
xcl5918 发表于 2015-8-30 11:04:06
非常好!嗯哈哈
回复

使用道具 举报

oveydcc 发表于 2015-8-31 13:21:35
挺好的 谢谢lz啦
回复

使用道具 举报

xuandian124520 发表于 2016-12-30 15:03:46
又是一篇抄袭的
回复

使用道具 举报

Hentai 发表于 2017-1-4 14:05:16
楼主你copy的时候能仔细看看么,有些根本就不是java代码 你也写个java代码
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条