分享

Kafka原理详解

问题导读:


1、Kafka与其他常用的Message Queue有什么区别?
2、Kafka的使用场景有哪些?
3、Kafka的架构包括哪些?
4、Kafka消息流是怎样处理的?
5、Kafka offset机制是什么?



Kafka简介
Kafka是由LinkedIn开发的一个分布式的消息系统,最初是用作LinkedIn的活动流(Activity Stream)和运营数据处理的基础。

Kafka使用Scala编写,它以可水平扩展和高吞吐率而被广泛使用。目前越来越多的开源分布式处理系统如Cloudera、Apache Storm、Spark都支持与Kafka集成。

综上,Kafka是一种分布式的,基于发布/订阅的消息系统,能够高效并实时的吞吐数据,以及通过分布式集群及数据复制冗余机制(副本冗余机制)实现数据的安全。

Kafka和常用Message Queue对比
RabbitMQ
RabbitMQ是使用Erlang编写的一个开源的消息队列,本身支持很多的协议:AMQP,XMPP, SMTP, STOMP,也正因如此,它非常重量级,更适合于企业级的开发。同时实现了Broker构架,这意味着消息在发送给客户端时先在中心队列排队。对路由,负载均衡或者数据持久化都有很好的支持。

Redis
Redis是一个基于Key-Value对的NoSQL数据库,开发维护很活跃。虽然它是一个Key-Value数据库存储系统,但它本身支持MQ功能,所以完全可以当做一个轻量级的队列服务来使用。

ZeroMQ
ZeroMQ号称最快的消息队列系统,尤其针对大吞吐量的需求场景。ZeroMQ能够实现RabbitMQ不擅长的高级/复杂的队列,但是开发人员需要自己组合多种技术框架,技术上的复杂度是对这MQ能够应用成功的挑战。但是ZeroMQ仅提供非持久性的队列,也就是说如果宕机,数据将会丢失。其中,Twitter的Storm 0.9.0以前的版本中默认使用ZeroMQ作为数据流的传输(Storm从0.9版本开始同时支持ZeroMQ和Netty(NIO)作为传输模块)。

ActiveMQ
ActiveMQ是Apache下的一个子项目。 类似于ZeroMQ,它能够以代理人和点对点的技术实现队列。同时类似于RabbitMQ,它少量代码就可以高效地实现高级应用场景。

Kafka的使用场景
Messaging
对于一些常规的消息系统,kafka是个不错的选择;partitons/replication和容错,可以使kafka具有良好的扩展性和性能优势.不过到目前为止,kafka并没有提供JMS中的"事务性"“消息传输担保(消息确认机制)”"消息分组"等企业级特性【删减】;kafka只能使用作为"常规"的消息系统,在一定程度上,尚未确保消息的发送与接收绝对可靠(比如,消息重发,消息发送丢失等)

Website activity tracking
kafka可以作为"网站活性跟踪"的最佳工具;可以将网页/用户操作等信息发送到kafka中.并实时监控,或者离线统计分析等

Metric
Kafka通常被用于可操作的监控数据。这包括从分布式应用程序来的聚合统计用来生产集中的运营数据提要。

Log Aggregatio
kafka的特性决定它非常适合作为"日志收集中心";application可以将操作日志"批量""异步"的发送到kafka集群中,而不是保存在本地或者DB中;kafka可以批量提交消息/压缩消息等,这对producer端而言,几乎感觉不到性能的开支.此时consumer端可以使hadoop等其他系统化的存储和分析系统。

Kafka架构
20190424145849985.png

producer: 消息生产者,发布消息到 kafka 集群的终端或服务。

broker: kafka 集群中包含的服务器,存储消息。broker (经纪人,消费转发服务)

topic: 每条发布到 kafka 集群的消息属于的类别,即 kafka 是面向 topic 的。

partition: partition 是物理上的概念,每个 topic 包含一个或多个 partition。kafka
分配的单位是 partition。

consumer: 从 kafka 集群中消费消息的终端或服务。

Consumer group: high-level consumer API 中,每个consumer 都属于一个 consumer group,每条消息只能被 consumer group 中的一个 Consumer 消费,但可以被多个 consumer group 消费。即组间数据是共享的,组内数据是竞争的

replica: partition 的副本,保障 partition 的高可用。

leader: replica 中的一个角色, producer 和consumer 只跟 leader 交互。

follower: replica 中的一个角色,从 leader 中复制数据。

controller: kafka 集群中的其中一个服务器,用来进行 leader election 以及 各种 failover。

zookeeper: kafka 通过 zookeeper来存储集群的 meta 信息。

Segment: partition物理上由多个segment组成,每个Segment存着message信息

Topic和Partition

  • Topic

每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic(主题)。Topic在逻辑上可以被认为是一个queue,每条消息都必须指定它的Topic,可以简单理解为必须指明把这条消息放进哪个queue里。

  • Partition

Parition是物理上的概念,每个Topic包含一个或多个Partition。为了使得Kafka的吞吐率可以线性提高,物理上把Topic分成一个或多个Partition,每个Partition在物理上对应一个文件夹,该文件夹下存储这个Partition的所有消息和索引文件,文件夹命名是[topicname][partition][序号],一个topic可以有无数多的partition。在kafka配置文件中可随时更改num.partitions参数来配置更改topic的partition数量,也可以在创建Topic时通过参数指定parittion数量。Topic创建之后通过Kafka提供的工具也可以修改partiton数量。

一般来说:
(1)一个Topic的Partition数量大于等于Broker的数量,可以提高吞吐率。
(2)同一个Partition的Replica尽量分散到不同的机器,高可用。

add a new partition时: partition里面的message不会重新进行分配,原来的partition里面的message数据不会变,新加的这个partition刚开始是空的,随后进入这个topic的message就会重新参与所有partition的load balance

Kafka消息流处理
Producer 写入消息序列图:
20190424153503244.png
流程说明:

  • producer 先从 zookeeper 的 “/brokers/…/state” 节点找到该partition 的 leader
  • producer 将消息发送给该 leader
  • leader 将消息写入本地 log
  • followers 从 leader pull 消息,写入本地 log 后 leader 发送 ACK
  • leader 收到所有 ISR 中的 replica 的 ACK 后,增加 HW(high watermark,最后 commit 的 offset)并向 producer 发送 ACK。


producer采用推(push)模式将消息发布到broker,每条消息都被追加(append)到分区(patition)中,属于顺序写磁盘(经验证,顺序写磁盘效率比随机写内存还要高,这是Kafka高吞吐率的一个很重要的保证)。

对于传统的message queue而言,一般会删除已经被消费的消息,而Kafka集群会保留所有的消息,无论其被消费与否。当然,因为磁盘限制,不可能永久保留所有数据(实际上也没必要),因此Kafka提供两种策略删除旧数据。一是基于时间,二是基于Partition文件大小。

例如可以通过配置$KAFKA_HOME/config/server.properties,让Kafka删除一周前的数据,也可Partition文件超过1GB时删除旧数据,配置如下所示:
[mw_shl_code=text,true]#符合删除条件的日志文件的最短期限
log.retention.hours=168

#日志文件的最大的大小。当达到此大小时,将创建一个新的日志段。
log.segment.bytes=1073741824

#定期检查日志是否符合删除条件
log.retention.check.interval.ms=300000

#如果设置了Log.Cleaner.Enable=true,则将启用Cleaner,然后可以标记单个日志进行日志压缩
log.cleaner.enable=false[/mw_shl_code]
Kafka的高可用
Partition Replica:
同一个 partition 可能会有多个 replica(对应 server.properties 配置中的default.replication.factor=N)。

没有 replica 的情况下,一旦 broker 宕机,其上所有 patition 的数据都不可被消费,同时 producer 也不能再将数据存于其上的 patition。

引入replication 之后,同一个partition 可能会有多个 replica,而这时需要在这些 replica 之间选出一个 leader,producer 和 consumer 只与这个 leader 交互,其它 replica 作为 follower 从 leader 中复制数据。

每个partition可以在其他的kafka broker节点上存副本,这样当某个kafka broker节点宕机时,不会影响这个kafka集群的正常工作。

replica副本的方式是按照kafka broker的顺序来村的。例如有5个kafka broker节点,某个topic有3个partition,每个partition存2个副本,那么partition1存broker1,broker2,partition2broker2,broker3。。。以此类推(replica副本数目不能大于kafka broker节点的数目,否则报错。这里的replica数其实就是partition的副本总数,其中包括一个leader,其他的就是copy副本)。这样即使某个broker宕机,也能保证整个kafka集群内的数据依然是完整的。

当然,replica副本数越高,系统越稳定,但是会导致资源和性能上的下降;replica副本少的话,也会造成系统丢数据的风险。在性能和数据可靠性之间的平衡根据具体的业务自己去衡量。

leader failover

当 partition 对应的 leader 宕机时,需要从 follower 中选举出新 leader。在选举新leader时,一个基本的原则是,新的 leader 必须拥有旧 leader commit 过的所有消息。

kafka 在 zookeeper 中(/brokers/…/state)动态维护了一个 ISR(in-sync replicas),由写入流程可知 ISR 里面的所有 replica 都跟上了 leader,只有 ISR 里面的成员才能选为 leader。对于 f+1 个 replica,一个 partition可以在容忍 f 个 replica 失效的情况下保证消息不丢失。

比如 一个分区 有5个副本,挂了4个,剩一个副本,依然可以工作。

注意:kafka的选举不同于zookeeper,用的不是过半选举。

当所有 replica 都不工作时,有两种可行的方案:

等待 ISR 中的任一个 replica 活过来,并选它作为 leader。可保障数据不丢失,但时间可能相对较长。

选择第一个活过来的 replica(不一定是 ISR 成员)作为 leader。无法保障数据不丢失,但相对不可用时间较短。

kafka 0.8.* 使用第二种方式。此外, kafka 通过 Controller 来选举 leader。

Kafka offset机制
区数据的Offset存储机制

一个分区在文件系统里存储为一个文件夹。文件夹里包含日志文件和索引文件。其文件名是其包含的offset的最小的条目的offset。
20190424160921710.png

20190424161038777.png
实际上offset的存储采用了稀疏索引,这样对于稠密索引来说节省了存储空间,但代价是查找费点时间。

Consumer消费者的offset存储机制

Consumer在从broker读取消息后,可以选择commit,该操作会在Kakfa中保存该Consumer在该Partition中读取的消息的offset。该Consumer下一次再读该Partition时会从下一条开始读取。

通过这一特性可以保证同一消费者从Kafka中不会重复消费数据。

底层实现原理:

执行:sh kafka-console-consumer.sh --bootstrap-server hadoop01:9092 --topic enbook –from-beginning --new-consumer

执行:sh kafka-consumer-groups.sh --bootstrap-server hadoop01:9092 --list --new-consumer

查询得到的group id
20190424161408714.png
进入kafka-logs目录查看,会发现多个很多目录,这是因为kafka默认会生成50个__consumer_offsets
的目录,用于存储消费者消费的offset位置。
20190424161435822.png

Kafka会使用下面公式计算该消费者group位移保存在__consumer_offsets的哪个目录上:

Math.abs(groupID.hashCode()) % numPartitions

Kafka消息投递可靠性
一个消息如何算投递成功,Kafka提供了三种模式:

  • 第一种是啥都不管,发送出去就当作成功,这种情况当然不能保证消息成功投递到broker;
  • 第二种是Master-Slave模型,只有当Master和所有Slave都接收到消息时,才算投递成功,这种模型提供了最高的投递可靠性,但是损伤了性能;
  • 第三种模型,即只要Master确认收到消息就算投递成功;


实际使用时,根据应用特性选择,绝大多数情况下都会中和可靠性和性能选择第三种模型。

消息在broker上的可靠性: 因为消息会持久化到磁盘上,所以如果正常stop一个broker,其上的数据不会丢失;但是如果不正常stop,可能会使存在页面缓存来不及写入磁盘的消息丢失,这可以通过配置flush页面缓存的周期、阈值缓解,但是同样会频繁的写磁盘会影响性能,又是一个选择题,根据实际情况配置。

消息消费的可靠性: Kafka提供的是“At least once”模型,因为消息的读取进度由offset提供,offset可以由消费者自己维护也可以维护在zookeeper里,但是当消息消费后consumer挂掉,offset没有即时写回,就有可能发生重复读的情况,这种情况同样可以通过调整commit offset周期、阈值缓解,甚至消费者自己把消费和commit offset做成一个事务解决,但是如果你的应用不在乎重复消费,那就干脆不要解决,以换取最大的性能。

Kafka的持久化
Kafka中会把消息持久化到本地文件系统中,并且保持极高的效率。我们众所周知IO读取是非常耗资源的性能也是最慢的,这就是为了数据库的瓶颈经常在IO上,需要换SSD硬盘的原因。但是Kafka作为吞吐量极高的MQ,却可以非常高效的message持久化到文件。这是因为Kafka是顺序写入的,速度非常快,这也是高吞吐量的原因。由于message的写入持久化是顺序写入的,因此message在被消费的时候也是按顺序被消费的,保证partition的message是顺序消费的。一般的机器,单机每秒100k条数据

Kafka的高吞吐量
Kafka的高吞吐量体现在读写上,分布式并发的读和写都非常快:
写的性能体现在以o(1)的时间复杂度进行顺序写入。Producer采用异步push方式,极大提高Kafka系统的吞吐率
读的性能体现在以o(1)的时间复杂度进行顺序读取。 对topic进行partition分区,consume group中的consume线程可以以很高能性能进行顺序读。



最新经典文章,欢迎关注公众号

来源:CSDN

作者:暁洣

原文:《大数据学习-Kafka原理详解》

https://blog.csdn.net/weixin_43192721/article/details/89496415





已有(3)人评论

跳转到指定楼层
yang000 发表于 2019-5-6 14:12:49
Kafka的使用场景
Messaging
对于一些常规的消息系统,kafka是个不错的选择;partitons/replication和容错,可以使kafka具有良好的扩展性和性能优势.不过到目前为止,kafka并没有提供JMS中的"事务性"“消息传输担保(消息确认机制)”"消息分组"等企业级特性;kafka只能使用作为"常规"的消息系统,在一定程度上,尚未确保消息的发送与接收绝对可靠(比如,消息重发,消息发送丢失等)

确定没有吗???
回复

使用道具 举报

阿飞 发表于 2019-5-6 14:49:02
yang000 发表于 2019-5-6 14:12
Kafka的使用场景
Messaging
对于一些常规的消息系统,kafka是个不错的选择;partitons/replication和容错, ...

老铁读的很细,这个已经有了。
kafka的事务已经有了。可参考下面内容
https://blog.csdn.net/ransom0512/article/details/78840042

感谢老铁提出疑问,后面我们会在质量方面加强。
希望老铁多多监督。
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条