问题导读 1.spark中什么是stage? 2.什么是 kafka topic? 3.本文中kafka和消费者线程遵循哪些约束?
最近在做利用Spark streaming和Kafka进行数据分析的研究, 整理一些相应的开发文档, 做了一些代码实践。 本文特意将这些资料记录下来。 本文最后列出了一些参考的文档,实际调研中参考了很多的资料,并没有完全将它们记录下来, 只列出了主要的一些参考资料。 当前的版本: - Spark: 1.2.0
- Kafka: 0.8.1.1
Spark Streaming属于Spark的核心api,它支持高吞吐量、支持容错的实时流数据处理。 有以下特点: 易于使用 提供了和批处理一致的高级操作API,可以进行map, reduce, join, window。 容错 Spark Streaming可以恢复你计算的状态, 包括lost work和operator state (比如 sliding windows)。 支持worker节点和driver 节点恢复。 Spark集成 可以结合批处理流和交互式查询。 可以重用批处理的代码。还可以直接使用内置的机器学习算法、图算法包来处理数据。 它可以接受来自文件系统, Akka actors, rsKafka, Flume, Twitter, ZeroMQ和TCP Socket的数据源或者你自己定义的输入源。
它的工作流程像下面的图所示一样,接受到实时数据后,给数据分批次,然后传给Spark Engine处理最后生成该批次的结果流。
Spark Streaming提供了一个高级的抽象模型,叫做discretized stream或者叫做DStream,它代表了一个持续的数据流。DStream既可以从Kafka, Flume, 和 Kinesis中产生, 或者在其它DStream上应用高级操作得到。 内部实现上一个DStream代表一个RDD序列。 一个简单例子在我们开始进入编写我们自己的Spark Streaming程序细节之前, 让我们先快速的看一个简单的Sparking Streaming程序是什么样子的。 这个程序接收网络发过来的文本数据,让我们统计一下文本中单词的数量。 全部代码如下:
首先, 我们导入Spark Streaming类名以及StreamingContext的一些隐式转换到我们的环境中, 这样可以为我们需要的类(比如DStream)增加一些有用的方法。. StreamingContext是所有功能的主入口。 我们创建了一个本地StreamingContext, 它使用两个线程, 批处理间隔为1秒.
Scala代码
- import org.apache.spark._
- import org.apache.spark.streaming._
- import org.apache.spark.streaming.StreamingContext._
- // Create a local StreamingContext with two working thread and batch interval of 1 second.
- // The master requires 2 cores to prevent from a starvation scenario.
- val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
- val ssc = new StreamingContext(conf, Seconds(1))
复制代码
Java代码 - import org.apache.spark.*;
- import org.apache.spark.api.java.function.*;
- import org.apache.spark.streaming.*;
- import org.apache.spark.streaming.api.java.*;
- import scala.Tuple2;
- // Create a local StreamingContext with two working thread and batch interval of 1 second
- SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
- JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1))
复制代码
使用这个context, 我们可以创建一个DStream, 代表来自TCP源的流数据。需要指定主机名和端口(如 localhost 和 9999). Scala代码 - // Create a DStream that will connect to hostname:port, like localhost:9999
- val lines = ssc.socketTextStream("localhost", 9999)
复制代码
Java代码 - // Create a DStream that will connect to hostname:port, like localhost:9999
- JavaReceiverInputDStream<String> lines = jssc.socketTextStream("localhost", 9999);
复制代码
这一行代表从数据服务器接受到的数据流. DStream中每条记录是一行文本. 接下来, 我们想使用空格分隔每一行,这样就可以得到文本中的单词。 Scala代码 - // Split each line into words
- val words = lines.flatMap(_.split(" "))
复制代码
Java代码 - // Split each line into words
- JavaDStream<String> words = lines.flatMap(
- new FlatMapFunction<String, String>() {
- @Override public Iterable<String> call(String x) {
- return Arrays.asList(x.split(" "));
- }
- });
复制代码
flatMap 是一个一对多的DStream操作, 它从源DStream中的每一个Record产生多个Record, 这些新产生的Record组成了一个新的DStream。 在我们的例子中, 每一行文本被分成了多个单词, 结果得到单词流DStream. 下一步, 我们想统计以下单词的数量.
Scala代码 - import org.apache.spark.streaming.StreamingContext._
- // Count each word in each batch
- val pairs = words.map(word => (word, 1))
- val wordCounts = pairs.reduceByKey(_ + _)
- // Print the first ten elements of each RDD generated in this DStream to the console
- wordCounts.print()
复制代码
Java代码 - // Count each word in each batch
- JavaPairDStream<String, Integer> pairs = words.map(
- new PairFunction<String, String, Integer>() {
- @Override public Tuple2<String, Integer> call(String s) throws Exception {
- return new Tuple2<String, Integer>(s, 1);
- }
- });
- JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey(
- new Function2<Integer, Integer, Integer>() {
- @Override public Integer call(Integer i1, Integer i2) throws Exception {
- return i1 + i2;
- }
- });
- // Print the first ten elements of each RDD generated in this DStream to the console
- wordCounts.print();
复制代码
单词DStream 被mapped (one-to-one transformation) 成*(word, 1)对*的DStream ,然后reduced 得到每一批单词的频度. 最后, wordCounts.print()会打印出每一秒产生的一些单词的统计值。 注意当这些行执行时,Spark Streaming仅仅设置这些计算, 它并没有马上被执行。 当所有的计算设置完后,我们可以调用下面的代码启动处理
Scala代码 - ssc.start() // Start the computation
- ssc.awaitTermination() // Wait for the computation to terminate
复制代码
Java代码 - jssc.start(); // Start the computation
- jssc.awaitTermination(); // Wait for the computation to terminate
复制代码
Scala代码 复制代码
Java代码 复制代码
然后, 在另一个终端中, 你可以启动例子 Scala代码 - $ ./bin/run-example streaming.NetworkWordCount localhost 9999
复制代码
Java代码 - $ ./bin/run-example streaming.JavaNetworkWordCount localhost 9999
复制代码
然后, 在netcat服务器中输入的每一行都会被统计,然后统计结果被输出到屏幕上。 类似下面的输出
Scala代码 - # TERMINAL 2: RUNNING NetworkWordCount
- $ ./bin/run-example streaming.NetworkWordCount localhost 9999
- ...
- -------------------------------------------
- Time: 1357008430000 ms
- -------------------------------------------
- (hello,1)
- (world,1)
- ...
复制代码
Java代码 - # TERMINAL 2: RUNNING JavaNetworkWordCount
- $ ./bin/run-example streaming.JavaNetworkWordCount localhost 9999
- ...
- -------------------------------------------
- Time: 1357008430000 ms
- -------------------------------------------
- (hello,1)
- (world,1)
- ...
复制代码
核心概念本节介绍一些Spark和Kafka的概念 Spark cluster: 一个Spark集群至少包含一个worker节点。
worker node: 一个工作节点可以执行一个或者多个executor. executor: executor就是一个进程, 负责启在一个worker节点上启动应用,运行task执行计算,存储数据到内存或者磁盘上。 每个Spark应用都有自己的executor。一个executor拥有一定数量的cores, 也被叫做“slots”, 可以执行指派给它的task。 job: 一个并行的计算单元,包含多个task。 在执行Spark action (比如 save, collect)产生; 在log中可以看到这个词。 task: 一个task就是一个工作单元, 可以发送给一个executor执行。 它执行你的应用的实际计算的部分工作。 每个task占用父executor的一个slot (core)。 stage: 每个job都被分隔成多个彼此依赖称之为stage的task(类似MapReduce中的map 和 reduce stage); 共享变量: 普通可序列化的变量复制到远程各个节点。在远程节点上的更新并不会返回到原始节点。因为我们需要共享变量。 Spark提供了两种类型的共享变量。 Scala代码 - * Broadcast 变量。 SparkContext.broadcast(v)通过创建, **只读**。
- * Accumulator: 累加器,通过SparkContext.accumulator(v)创建,在任务中只能调用add或者+操作,不能读取值。只有驱动程序才可以读取值。
复制代码
Java代码 - * Broadcast 变量。 SparkContext.broadcast(v)通过创建, **只读**。
- * Accumulator: 累加器,通过SparkContext.accumulator(v)创建,在任务中只能调用add或者+操作,不能读取值。只有驱动程序才可以读取值。
复制代码
receiver: receiver长时间(可能7*24小时)运行在executor。 每个receiver负责一个 input DStream (例如 一个 读取Kafka消息的input stream)。 每个receiver, 加上input DStream会占用一个core/slot. input DStream: 一个input DStream是一个特殊的DStream, 将Spark Streaming连接到一个外部数据源来读取数据。 kafka topic: topic是发布消息发布的category 或者 feed名. 对于每个topic, Kafka管理一个分区的log,如下图所示:
分区内的消息都是有序不可变的。 kafka partition: partitions的设计目的有多个.最根本原因是kafka基于文件存储.通过分区,可以将日志内容分散到多个server上,来避免文件尺寸达到单机磁盘的上限,每个partiton都会被当前server(kafka实例)保存;可以将一个topic切分多任意多个partitions(备注:基于sharding),来消息保存/消费的效率.此外越多的partitions意味着可以容纳更多的consumer,有效提升并发消费的能力. kafka consumer group: 在kafka中,每个消费者要标记自己在那个组中。 如果所有的消费者都在同一个组中,则类似传统的queue消息模式,消息只发给一个消费者。 如果消费者都在不同的组中, 则类似发布-订阅消息模式。 每个消费者都会得到所有的消息。 最通用的模式是混用这两种模式,如下图:
关于kafka和消费者线程, 遵循下面的约束: 如果你的消费者读取包含10个分区的 test的topic, - 如果你配置你的消费者只使用1个线程, 则它负责读取十个分区
- 如果你配置你的消费者只使用5个线程, 则每个线程负责读取2个分区
- 如果你配置你的消费者只使用10个线程, 则每个线程负责读取1个分区
- 如果你配置你的消费者只使用14个线程, 则10个线程各负责读取1个分区,4个空闲
- 如果你配置你的消费者只使用8个线程, 则6个线程个负责一个分区,2个线程各负责2个分区
从Kafka并行读取有几种方法可以并行的读取Kafka的消息。 Spark的KafkaInputDStream (也叫做Kafka “connector”)使用 Kafka high-level consumer API读取数据,所以有两种方式可以并行的读取数据。 - 多个input DStream: Spark为每个input dstream运行一个receiver. 这意味着多个input dstream可以运行在多个core上并行读取。 如果它们使用相同的topic,则相当于一个load balancer, 一个时间点上只有一个receiver读取。 如果不同的topic,可以同时读取。
Scala代码
- val ssc: StreamingContext = ??? // ignore for now
- val kafkaParams: Map[String, String] = Map("group.id" -> "test", /* ignore rest */)
- val numInputDStreams = 5
- val kafkaDStreams = (1 to numInputDStreams).map { _ => KafkaUtils.createStream(...) }
复制代码
Java代码
- val ssc: StreamingContext = ??? // ignore for now
- val kafkaParams: Map[String, String] = Map("group.id" -> "test", /* ignore rest */)
- val numInputDStreams = 5
- val kafkaDStreams = (1 to numInputDStreams).map { _ => KafkaUtils.createStream(...) }
复制代码
每个input dstream的消费者线程数。 同一个receiver可以运行多个线程。 可以配置和分区相同的线程。Scala代码- val ssc: StreamingContext = ??? // ignore for now
- val kafkaParams: Map[String, String] = Map("group.id" -> "terran", ...)
- val consumerThreadsPerInputDstream = 3
- val topics = Map("test" -> consumerThreadsPerInputDstream)
- val stream = KafkaUtils.createStream(ssc, kafkaParams, topics, ...)
复制代码
Java代码 - val ssc: StreamingContext = ??? // ignore for now
- val kafkaParams: Map[String, String] = Map("group.id" -> "terran", ...)
- val consumerThreadsPerInputDstream = 3
- val topics = Map("test" -> consumerThreadsPerInputDstream)
- val stream = KafkaUtils.createStream(ssc, kafkaParams, topics, ...)
复制代码
或者你还可以混合这两种情况:
Scala代码 - <font size="2" style="font-weight: normal;">val ssc: StreamingContext = ???
- val kafkaParams: Map[String, String] = Map("group.id" -> "terran", ...)
- val numDStreams = 5
- val topics = Map("zerg.hydra" -> 1)
- val kafkaDStreams = (1 to numDStreams).map { _ =>
- KafkaUtils.createStream(ssc, kafkaParams, topics, ...)
- }</font>
复制代码
Java代码- <font size="2" style="font-weight: normal;">val ssc: StreamingContext = ???
- val kafkaParams: Map[String, String] = Map("group.id" -> "terran", ...)
- val numDStreams = 5
- val topics = Map("zerg.hydra" -> 1)
- val kafkaDStreams = (1 to numDStreams).map { _ =>
- KafkaUtils.createStream(ssc, kafkaParams, topics, ...)
- }</font>
复制代码
Spark并行处理上面介绍了Kafka的并行化读取的控制,在Spark中我们可以进行并行化处理。类似Kafka,Spark将parallelism设置的与(RDD)分区数量有关, 通过在每个RDD分区上运行task进行。在有些文档中,分区仍然被称为“slices”。 同样两个控制手段: - input DStreams的数量
- DStream transformation的重分配(repartition): 这里将获得一个全新的DStream,其parallelism等级可能增加、减少,或者保持原样。在DStream中每个返回的RDD都有指定的N个分区。DStream由一系列的RDD组成,DStream.repartition则是通过RDD.repartition实现。 因此,repartition是从processing parallelism分隔read parallelism的主要途径。在这里,我们可以设置processing tasks的数量,也就是说设置处理过程中所有core的数量。间接上,我们同样设置了投入machines/NICs的数量。
一个DStream转换相关是 union。这个方法同样在StreamingContext中,它将从多个DStream中返回一个统一的DStream,它将拥有相同的类型和滑动时间。union会将多个 DStreams压缩到一个 DStreams或者RDD中,但是需要注意的是,这里的parallelism并不会发生改变。 你的用例将决定你如何分区。如果你的用例是CPU密集型的,你希望对test topic进行5 read parallelism读取。也就是说,每个消费者进程使用5个receiver,但是却可以将processing parallelism提升到20。
Scala代码 - val ssc: StreamingContext = ???
- val kafkaParams: Map[String, String] = Map("group.id" -> "terran", ...)
- val readParallelism = 5
- val topics = Map("test" -> 1)
- val kafkaDStreams = (1 to readParallelism).map { _ =>
- KafkaUtils.createStream(ssc, kafkaParams, topics, ...)
- }
- //> collection of five *input* DStreams = handled by five receivers/tasks
- val unionDStream = ssc.union(kafkaDStreams) // often unnecessary, just showcasing how to do it
- //> single DStream
- val processingParallelism = 20
- val processingDStream = unionDStream(processingParallelism)
- //> single DStream but now with 20 partitions
复制代码
Java代码 - val ssc: StreamingContext = ???
- val kafkaParams: Map[String, String] = Map("group.id" -> "terran", ...)
- val readParallelism = 5
- val topics = Map("test" -> 1)
- val kafkaDStreams = (1 to readParallelism).map { _ =>
- KafkaUtils.createStream(ssc, kafkaParams, topics, ...)
- }
- //> collection of five *input* DStreams = handled by five receivers/tasks
- val unionDStream = ssc.union(kafkaDStreams) // often unnecessary, just showcasing how to do it
- //> single DStream
- val processingParallelism = 20
- val processingDStream = unionDStream(processingParallelism)
- //> single DStream but now with 20 partitions
复制代码
注意事项 Spark 1.1并不会恢复那些已经接收却没有进行处理的原始数据( 查看)。因此,在某些情况下,你的Spark可能会丢失数据。Tathagata Das指出驱动恢复问题会在Spark的1.2版本中解决,现在已经提供 Reliable Receiver 和Unreliable Receiver两种Receiver。 1.1版本中的Kafka连接器是基于Kafka的高级消费者API。这样就会造成一个问题,Spark Streaming不可以依赖其自身的KafkaInputDStream将数据从Kafka中重新发送,从而无法解决下游数据丢失问题(比如Spark服务器发生故障)。 Dibyendu Bhattacharya 实现了使用简单消费者API: kafka-spark-consumer. 使用最新的Spark和Kafka,一些bugs已经在最新的Spark和Kafka中修复。 在使用window操作时,window duration和sliding duration必须是DStream批处理的duration的整数倍。 如果分配给应用的core的数量小于或者等于input DStream/receiver数量,则系统只接收数据, 没有额外的core处理数据 接上一条, 你在本地进行测试的时候,如果将master URL设置为“local”的话,则只有一个core运行任务,这明显违反上一条, 只能接收数据,无法处理。 Kafak Topic 的分区和 Spark RDD的分区没有任何关系。 它俩是分别设置的。
容错有两种情况的机器失败。 worker节点失败receiver接收到的消息在集群间有备份。如果只是一个节点失败, Spark可以恢复。 但是如果是receiver所在的那个节点失败,可能会有一点点数据丢失。 但是Receiver可以在其它节点上恢复启动,继续接收数据。 driver节点失败如果7*24工作的应用, 如果driver节点失败,Spark Streaming也可以恢复。 Spark streaming定期的把元数据写到HDFS中。 你需要设置checkpoint 文件夹。 为了支持恢复,必须遵循下面的处理: - 当应用首次启动时, 它会创建一个新的StreamingContext, 设置所有的流,然后启动start().
- 当应用因失败而恢复时, 它会从checkpoint文件中的checkpoint重建StreamingContext.
就像这样: Scala代码 - // Function to create and setup a new StreamingContext
- def functionToCreateContext(): StreamingContext = {
- val ssc = new StreamingContext(...) // new context
- val lines = ssc.socketTextStream(...) // create DStreams
- ...
- ssc.checkpoint(checkpointDirectory) // set checkpoint directory
- ssc
- }
- // Get StreamingContext from checkpoint data or create a new one
- val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)
- // Do additional setup on context that needs to be done,
- // irrespective of whether it is being started or restarted
- context. ...
- // Start the context
- context.start()
- context.awaitTermination()
复制代码
Java代码 - // Create a factory object that can create a and setup a new JavaStreamingContext
- JavaStreamingContextFactory contextFactory = new JavaStreamingContextFactory() {
- @Override public JavaStreamingContext create() {
- JavaStreamingContext jssc = new JavaStreamingContext(...); // new context
- JavaDStream<String> lines = jssc.socketTextStream(...); // create DStreams
- ...
- jssc.checkpoint(checkpointDirectory); // set checkpoint directory
- return jssc;
- }
- };
- // Get JavaStreamingContext from checkpoint data or create a new one
- JavaStreamingContext context = JavaStreamingContext.getOrCreate(checkpointDirectory, contextFactory);
- // Do additional setup on context that needs to be done,
- // irrespective of whether it is being started or restarted
- context. ...
- // Start the context
- context.start();
- context.awaitTermination();
复制代码
|