立即注册 登录
About云 返回首页

sstutu的个人空间 https://aboutyun.com/?70 [收藏] [复制] [分享] [RSS]

日志

spark读取kafka为什么这么慢

已有 2497 次阅读2018-9-2 13:09 |系统分类:Kafka

 sparkstreaming2.3, kafka消息处理慢,进行全流程打点,发现消息创建的时间和streaming中开始处理的时间相差比较大,处理过程很快,随着streaming运行时间增长,消息拉取越来越慢,加大cpu 内存都不起作用。
qq群里老铁遇到这么个问题,而且很多遇到了,这里说下,遇到问题,首先要做的是分析日志,然后分析源码。

我们不知道该如何做,下面案例可以参考:

环境:3台相同配置主机 每台主机一个broker、standalone方式运行spark集群,每台机器2个worker

TOPIC分布
Topic: MessageBody Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic: MessageBody Partition: 1 Leader: 1 Replicas: 1 Isr: 1
Topic: MessageBody Partition: 2 Leader: 2 Replicas: 2 Isr: 2
Topic: MessageBody Partition: 3 Leader: 0 Replicas: 0 Isr: 0
Topic: MessageBody Partition: 4 Leader: 1 Replicas: 1 Isr: 1
Topic: MessageBody Partition: 5 Leader: 2 Replicas: 2 Isr: 2

leader0 -> broker id[0] -> 157
leader0 -> broker id[3] -> 157
leader1 -> broker id[1] -> 152
leader1 -> broker id[4] -> 152
leader2 -> broker id[2] -> 155
leader2 -> broker id[5] -> 155



本次每个分区的总和相同都是53,说明上次分析的第2点不成立(可能时数据日志没归零造成的)
但是,第一点依然成立,说明目前环境下kafka分区序列化到spark分区的过程中本地性是有问题的,并不是之前所认为的leader的数据就在响应的spark机器上



从kafka获取数据异常缓慢,从spark webUI观察发现各个exector执行的时间差异很大,又得很快有的很慢

于是怀疑数据倾斜,kafka producer发送数据采用轮询发送重写partition的接口理论上应该是均匀的,于是在spark程序中验证了一下


val kafkaStreams = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
      ssc, kafkaParamsSeq, Set(props.getString(AppConstant.KafkaTopicName)))

kafkaStreams.foreachRDD(rdd => {
      rdd.mapPartitionsWithIndex((id, iter) => {
        val arr = iter.toArray
        val size = arr.map(m => (m._1.length + m._2.length)).reduce(_ + _)
        Iterator((id, arr.size, size))
      }).foreachPartition(m => LOG.info("KAFKA: " + m.mkString(",")))
    })



记录了 RDD【KafkaRDD】中partition中每个partition中数据个数和字节总数,发现两者都很均匀,这说明数据是均匀的没有倾斜,由于计算没有shffle,计算本身比较简单,所以计算应该不会占用过多时间,所以推断占用时间比较长的应该是取数据的时间(从kafka序列化到spark中所用的时间),取数据为什么会慢呢,这部分代码是spark本身已经实现的部分,所以查找源码。中间反复想,理论上计算时间应该均匀的分布在各个节点上,但是实际上并没有,日志中发现每个exector都获取到了6个partition信息,这点很奇怪,理论上应该只获取两个,后又在spark源码中发现

log.info(s"Computing topic ${part.topic}, partition ${part.partition} " +
      s"offsets ${part.fromOffset} -> ${part.untilOffset}")

这句话比较关键,收集了这个日志,写了个spark程序进行了分析

spark master主机调整到157上,半分钟执行一次【KEY: topic_主机_partitionID,  VALUE: partitionid出现次数】
(MessageBody_152_0,12),(MessageBody_152_1,17),(MessageBody_152_2,16),(MessageBody_152_3,17),(MessageBody_152_4,24),(MessageBody_152_5,20)
(MessageBody_155_0,24),(MessageBody_155_1,17),(MessageBody_155_2,17),(MessageBody_155_3,20),(MessageBody_155_4,12),(MessageBody_155_5,16)
(MessageBody_157_0,17),(MessageBody_157_1,19),(MessageBody_157_2,20),(MessageBody_157_3,16),(MessageBody_157_4,17),(MessageBody_157_5,17)

到此处已经证明,计算是不均匀的,并且计算不是本地化的【因为一个节点出现了的分区数是所有6个,而不是其中的两个】,

然后查找为什么不本地化呢,它应该本地化啊

override def getPreferredLocations(thePart: Partition): Seq[String] = {
    val part = thePart.asInstanceOf[KafkaRDDPartition]
    // TODO is additional hostname resolution necessary here
    Seq(part.host)
  }

,我就++了,于是查了一些资料,看了一些源码最终发现了两个比较有用的博客:

1、http://www.wtoutiao.com/p/1bcpfub.html

2、http://superlxw1234.iteye.com/blog/2221279

最终解决了问题,问题的根源是启动spark集群时使用$SPARK_HOME/sbin/start-all.sh时,启动的worker节点并没有按照slaves中配置的主机名启动而是按照ip启动,在默认8080端口可以看到,最终在计算本地性时,一方是主机名集合一方是ip地址集合结果造成字符串比较时找不到,本地化全部变成ANY,也就是随机获取计算节点那就意味着可能要将数据发送到计算节点上,实际上证明没有本地化大部分时间都没那么好运,基本上都没有在本机上节点运行【大量的网络IO,慢的要死】

启动spark集群:

1、$SPARK_HOME/sbin/start-master.sh

2、$SPARK_HOME/sbin/start-slave.sh -h <hostname> <masterURI>  例:$SPARK_HOME/sbin/start-slave.sh -h master.gt spark://master.gt:7077

然后再次运行测试程序,一切就和预想的一样了


记录:查看spark源码一直以为kafka的分区id和spark的分区id是一样的,但执行之后发现并不一样,然后仔细查看源码发现,KafkaRDD的分区index是zip得到的并没有使用kafka的topic分区id

参考:
https://blog.csdn.net/gongkongrs/article/details/51744181

路过

雷人

握手

鲜花

鸡蛋

评论 (0 个评论)

facelist doodle 涂鸦板

您需要登录后才可以评论 登录 | 立即注册

关闭

推荐上一条 /2 下一条