分享

Kafka文档:基本介绍(Getting Start)


问题导读

1.kafka中offset是什么?
2.offset是否由consumer控制?
3.传统消息传递系统采用哪两种模式?






来源:
http://kafka.apache.org/documentation.html#gettingStarted


说明:
原文中某些专有名词不做翻译:
kafka
topic
partition
consumer
producer
server
client
high-level


1、开始

1.1 介绍


kafka可提供分布式、分区的、可进行备份的提交日志的服务。它是可以进行消息传递的系统,同时设计比较独特。

上述所说意味着什么呢?

首先,我们来回顾一下基本的消息传递术语:
-维护消息的信息流(feed),并按照topics对消息进行分类
-producers用来发布消息
-consumer订阅topics,并消费topics下所发布的信息流(feed)
-以集群方式运行,集群由一个或多个称作broker的servers构成

因此,在high  level上来看,producer通过网络向kafka集群发送消息,同时,kafka集群可以向consumers提供这些消息,如图所示:
                                                                         1.gif
clients和servers之间通信通过简单的、高性能的、语言无关的TCP 协议进行;kafka不仅提供了Java client,同时还提供了很多其他语言形式的client。



Topics和Logs

首先深入了解一下kafka提供的high-level的抽象---topic
topic可以看成不同消息的类别或者信息流,不同的消息根据就是通过不同的topic进行分类或者汇总,然后producer将不同分类的消息发往不同的topic。对于每一个topic,kafka集群维护一个分区的日志:如图所示:

                                                               2.gif

上图中可以看出,每个partition中的消息序列都是有序的,并且不可更改,这些分区可以在尾部不停的追加消息。同一分区中的不同消息都会分配一个唯一的数字进行标识,这个数字被称为offset,用来进行消息的区分,每一条消息都是由若干个字节构成。

kafka集群可以保存所有发布的消息---无论消息是否consumed,保存时间是可配置的。例如,如果日志保存时间设置为两天,则从日志保存之时开始,两天之内都是可供消费的,然而两天之后消息会被抛弃以释放空间。因此,Kafka可以高效持久的保存大量的数据。

事实上,每个消费者所需要保存的元数据只有一个,即”offset“,即主要用来记录日志中当前consume的位置。offset是由consumer所控制的: 通常情况下,offset会随着consumer阅读消息而线性的递增,好似offset只能被动跟随consumer阅读变化,但实际上,offset完全是由consumer控制的,consumer可以从任何它喜欢的位置consum消息。例如,consumer可以将offset重新设置为先前的值并重新consum数据。

这些特征共同说明: Kafka consumer可以很廉价的进行操作----在不必影响集群和其他consumers的情况下,consumers可以很自由的来去。例如,你可以使用kafka提供的命令行工具,去追踪任何topic的内容,而不必改变当前consumers 所consum的topic内容。

日志服务器中存在partitions有以下若干目的:
第一,多个分区的共存可以使日志规模超过单个server的尺寸;需要注意的是,每一个单独的分区必须符合所在servers的尺寸,即同一个topic的同一个partition的数据只能在同一台server上存储,也就是说同一个topic下的同一个partition的数据不能同时存放于两台server上,但是同一个topic可以包含很多partitions,这样就使同一个topic可以包含任意数量的数据,理论上你可以通过增加server的数目来增加partitions的数目。

第二,多个partitions的存在,可以作为数据并行处理的单位,而不是以bit为单位(既可以有多个consumers对不同的partition进行consume,也可以有不同的consumers对同一个partition进行consume,因为offset是由consumer控制的)。


Distribution

日志的partitions分布在Kafka集群中的servers上,每个服务器都可以处理数据以及共享分区的需求。每个分区都可以进行备份,进行备份的servers数目是可以配置的,以提高容错能力。

每个partitions拥有一台称为“leader”的server,同时拥有0或者多个称为“followers”的servers;“leader”处理所有有关partition的读写请求,“follower”随之对“leader"进行备份。如果”leader“失败,则”followers"其中之一自动成为新的“leader”。 每个server都是它名下partitions的“leader”,同时作为其他partitions的“follower”,这样可以很好均衡集群负载。

Producers

Producers的作用实际上是向它们选择的topic发布数据。每个producer负责将消息分发到topic名下不同的partition上去。这种分发可以通过一种循环方式进行而仅为了负载均衡,也可以通过某些语义分区函数进行分发(即以消息中的某些关键字进行区分)。More  on the use of  partitioning  in  a second。

Consumers

传统消息传递系统采用两种模式: queuing   和    publish-subscribe。队列模式中,众多consumers可能从同一个server读取消息,而每条消息只能流向其中一个consumer。在发布-订阅模式下,每条消息都会发向所有的consumers。Kafka提供一种单独的consumer抽象,此抽象具有以上两种模式的特征----名为consumer  group。

队列模式:
                                              3.gif


发布-订阅者模式:
                                           4.gif


kafka  consumer group 模式:
                                             5.gif


Consumers使用consumer  group名字标识它们自己,每个topic的每条消息都会发送到某个consumer实例,这些实例所在的消费组需要提出订阅方可获取消息。这些consumer实例可以处于单独的进程中,也可以处于单独机器上。

若是所有consumer实例拥有共同的consumer组,则这种模式就像均衡consumers之间负载的传统队列模式。

若是所有consumer实例都具有不同的consumer组,则这种模式就像将所有消息广播到所有consumers的发布-订阅者模式。

可以发现,topics拥有少量的consumer组,其中之一就作为逻辑上的订阅者。每个组都是由多个consumer实例构成,目的是为了可伸缩性和容错性。相比发布-订阅者语义模式,kafka只有一点不同,即订阅者是consumers集群而非独立进程。

相对于传统的消息发送系统,Kafka拥有更强的次序保证。

                                 6.gif      

传统队列模式在server上按顺序存储消息,如果多个consumers从队列中consum,则server按照消息存储顺序发出消息。 然而,即使server按照顺序发出消息,由于消息是异步发向consumers,则这些消息可能并不是按照存储顺序到达consumers的(例如,server上消息的存放顺序是M1,M2,M3,M4,consumers是C1,C2,C3,C4,异步发送为:M1发向C2,M2发向C4,M3发向C1,M4发向C3,则消息到达consumer的时间次序可能是,M4,M2,M1,M3,这样的结果就是consumer处理消息的顺序与存储顺序不一致,就打乱了原始消息次序)。这就意味着:消息顺序在并行consumption中丢失了。消息传递系统的工作通常围绕这个原则:consumer唯一,即一个队列只允许有一个消费者,但这也就意味着在处理中丧失了并行机制。

Kafka这方面就做的很好。 通过设置并行机制的概念--即partition---在topics内部,Kafka可以同时提供消息顺序保证以及众多consumer 进程之间负载平衡。通过将topic内部的partitions分配给consumer组中的consumers,这样就使每个partition可以准确的由组中确定的consumer所消费。通过以上措施,可以保证某个consumer会成为某个partition的确定consumer,并且consumer会按照数据存储顺序消费。众多分区的存在可以平衡众多consumer实例之间的负载。需要注意的是,consumer实例不能多于partitions。

对Kafka来说,同一个partition内部消息都是有序的,但是同一个topic内不同partition中消息不能保证有序。通过关键字实现消息分区,以及每个分区内消息的有序,可以满足大多数应用。若是需要所有消息都是有序的,则可以通过设置一个topic只有一个partition来实现,这就意味着只有一个consumer  进程。

Kafka只能提供同一个partition内部消息的总体顺序,而不能提供同一个topic不同partition之间消息的顺序。  每个分区通过关键字划分数据的而获得的排序能力能够满足大多数应用。然而, 如果你需要所有消息都是有序的,则你可以设置只有一个topic,只有一个partition,这就是说也就只能有一个消费进程(因为,同一个partition的数据只能固定的发往某一个consumer组中的某一个确定的consumer,所以则只有一个consumer组的某一个consumer才能消费此topic下partition的数据,因此只有一个消费线程)。


Guarantees

在high-level上,Kafka给与了以下保证:

-发往特定topic partition的消息将会按照发送顺序进行追加。例如,如果M1和M2都是由相同的producer发送,而M1发送顺序较早,则M1在partition中offset要小于M2,就是说M1在消息日志中出现较早。

-consumer实例可以按照消息在日志中存储的顺序查看消息。

-若是topic 的参数--replication-factor 为N,即设置备份server为N,则即使N-1个备份server都失败了,容错性可以保证没有丢失任何提交的日志消息。

更多有关guarantees的细节在文档设计部分给出。


1.2  Use  Cases

此处有一些Apache  Kafa广泛使用的例子描述。如果需要这些方面的概述,请看  this blog  post。

Messaging(消息传递系统)

Kafka 不止可以作为传统消息broker的替代。消息brokers 存在原因很多:例如将数据产生和处理相剥离,缓存未处理的消息等。 相对大多数消息传递系统而言,Kafka拥有更加出色的吞吐量、内置的partitioning、备份、以及容错,这些优势使得Kafka成为大规模消息处理应用的好选择。

在我们经验中,消息传递应用都是相对的低吞吐量,但同时又需要端对端之间比较低的延迟,以及Kafka所能够提供的强大的持久性保证。

在这一领域中,Kafka可以与诸如ActiveMQ或者RabbitMQ等传统消息传递系统相比较。

Website  Activity  Tracking(网站活动跟踪)

kakfa最初应用实例是作为一系列实时发布-订阅的信息流(feed),以此重建用户活动跟踪管道。这就是说:网站活动(页面查看、搜索、或者其他用户采取的行为)发布到集中的topics名下(每一种活动类型对应一个topic)。这些信息流(feed)用于订阅的应用实例范围为:实时处理、实时监测、以及加载Hadoop或者加载针对离线处理和报告的离线数据仓库系统等。

用户页面查看产生大量活动消息,活动监测随之产生大量的数据。

Metrics(指标监测)

Kafka通常用来监控操作性数据。这就涉及到聚合来自分布式应用的统计数据,以产生操作性数据的汇总信息流(feed)。

Log  Aggregation(日志聚合)

很多人把kafka作为日志聚合办法的替代品。日志聚合通常收集servers之外的物理性日志文件,并将它们集中到某个位置(一个文件server或者HDFS)进行处理。Kafka抽象出文件细节,并将日志或者事件数据的清晰抽象作为消息流的形式给出。这就允许延时更低的处理,更加容易的支持多数据源以及分布式数据消耗。对比日志集中系统(例如Scribe或者Flume),Kafka提供更均衡的性能、由备份带来的更强大的持久性保证、以及更低的端对端延迟。

Stream  Processing(流处理)

很多用户最终使用级间数据处理:数据源于存放原始数据的topics,然后聚合、增强,或者转换成新的Kafka topics以获得更深层次的consumption。例如,对于文章推荐的处理流程来说: 首先,从RSS信息流中抓取文章内容,然后将这些文章内容发布到名为”articles“的topic中;然后,更深层次的处理可能是规范化或者精简这些内容,目的是为了获得更简洁的内容,并将处理后的内容发往存放精简内容的topic名下;最后,可能是试图将这些内容与同于相匹配。这就产生一幅实时数据在独立的topics之间流动的图谱。  Storm和Samza等框架广泛的应用在实现这些转换。

Event  Sourcing(收集)

Event  sourcing应用设计模式是:状态的改变被记录为一系列以时间为顺序的日志。Kafka对大规模数据存储的支持使它成为有效的后台处理方式,用于处理上述设计。

Commit  Log(提交日志)

Kafka可以用于分布式系统完成外部提交日志的功能。这些日志有助于备份节点之间的数据,同时有助于完成同步机制--备份失败节点的数据。log compaction特征有助于实现这种应用;在这种应用中,Kafka类似于Apache  BookKeeper 项目。



1.3   Quick  Start

本指导假设你刚开始并没有安装Kafka或者ZooKeeper。

Step  1: Download  the  code

下载 0.8.1.1 发布版本,然后解压缩:

> tar -xzf kafka_2.9.2-0.8.1.1.tgz
> cd kafka_2.9.2-0.8.1.1   

Step 2: Start  the server

Kafka使用 ZooKeeper,因此你需要首先启动ZooKeeper server。你可以方便的使用脚本来启动。

> bin/zookeeper-server-start.sh config/zookeeper.properties
[2013-04-22 15:01:37,495] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
...

现在启动Kafka 服务器:

> bin/kafka-server-start.sh config/server.properties
[2013-04-22 15:01:47,028] INFO Verifying properties (kafka.utils.VerifiableProperties)
[2013-04-22 15:01:47,051] INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties)
...

Step 3: 创建 topic

我们可以创建一个topic,命名为“test”,并且只有一个partition,也只有一个备份。

> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

现在可以看一下topic, 使用命令行:

> bin/kafka-topics.sh --list --zookeeper localhost:2181
   test

同样,你可以选择配置你的brokers,当向一个不存在的topic发送消息时,使brokers可以自动创建相应的topics,而不是手动去创建。

Step4:发送消息

Kafka拥有命令行客户端, 既可以从文件获得输入,也可以从标准输入获得,然后把这些输入作为消息发送给Kafka集群。 默认的设置是,每一行作为一个单独的消息发送出去。

运行Producer, 然后向控制端输入一些消息并发送到服务器。

Step 5: 启动一个Consumer

kafka 也拥有一个命令行的consumer,这就可以将消息输出到标准输出。

> bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
This is a message
This is another message

如果你在不同的终端运行上述命令,你可以在producer 终端输入消息,然后在consumer终端会看到这些消息。

所有命令行工具都有其他的选项;运行命令时不要输入任何参数,将会输出更多的选项细节信息。

Step 6:设置多broker的集群

目前为止,我们上述的测试都是在单broker环境下测试的,但是这还不够有意思。对Kafka来说,单节点broker是含有一个节点的集群,因此,除非你启动更多broker实例,不然没有任何事情发生变化。 但是, 就是为了感受这些变化,我们将我们的集群扩展为3节点(不用担心机器不够,这3个节点都是部署在同一个机器上)。

首先,我们需要为每个broker创建配置文件:实现方式就是拷贝几份  config/server.properties,然后根据具体需求进行配置:
> cp config/server.properties config/server-1.properties
> cp config/server.properties config/server-2.properties

现在可以编辑新配置文件,并按照下述进行设置:

config/server-1.properties:
                   broker.id=1
                   port=9093
                   log.dir=/tmp/kafka-logs-1

config/server-2.properties:
                   broker.id=2
                   port=9094
                   log.dir=/tmp/kafka-logs-2

broker.id 属性是唯一的,是集群中每个节点的永久性名字。 我们需要更改端口以及日志目录,只是因为我们在同一台机器上运行所有节点, 我们希望尽量避免所有brokers都注册相同的端口或者覆盖各自的数据。

我们前面步骤已使ZooKeeper 以及单节点已经启动,因此我们当前就只需要启动两个新节点:

> bin/kafka-server-start.sh config/server-1.properties &
...
> bin/kafka-server-start.sh config/server-2.properties &
...

现在创建一个新topic,并且设置 replication factore为3

> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic

好了,现在我们已有集群了,但是我们怎么知道是哪个brokers在运行。 运行“describe  topics"命令,来看一下:

> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic PartitionCount:1    ReplicationFactor:3    Configs:
      Topic: my-replicated-topic    Partition: 0    Leader: 1   Replicas: 1,2,0    Isr: 1,2,0

此处解释一下输出。第一行是所有partitions的总数,下面每一行都表示一个partition的信息。因为当前topic只有一个partition,因此只有一行。

“leader”  是负责针对给定partition读写操作的节点。每个节点都将成为随机选择的partitions的leader。

“replicas” 是给定partition备份节点的列表,无论这些备份节点是否leader,或者无论他们是否还活着。

“isr”   是同步的副本。这是副本列表的子集,即当前还活着的并且可以被leader联系到的。

注意: 我的例子中  1是topic中唯一一个partition的leader。

我们可以运行相同的命令行,用来查看我们最初创建的topic的信息:

> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test

Topic:test PartitionCount:1          ReplicationFactor:1   Configs:
         Topic: test   Partition: 0   Leader: 0           Replicas: 0          Isr: 0

毫无疑问,最初的topic没有副本备份,并且处于server 0上,就是当我们创建这个server时,我们的集群中仅有的server。

让我们向新topic发送一些消息:

> bin/kafka-console-producer.sh  --broker-list  localhost:9092   --topic  my-replicated-topic
...
my test message 1
my test message 2
^C

现在,可以消费这些消息

> bin/kafka-console-consumer.sh   --zookeeper  localhost:2181   --from-beginning  --topic  my-replicated-topic
...
my test message 1
my test message 2
^C

现在可以测试kafka 的容错性能,broker 1一直充当leader,因此我们可以杀掉他:

  > ps | grep server-1.properties
7564 ttys002    0:15.91 /System/Library/Frameworks/JavaVM.framework/Versions/1.6/Home/bin/java...
> kill -9 7564

leadership 转向两个被领导者之一,同时node 1不再同步。

备份设置:

> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic PartitionCount:1ReplicationFactor:3Configs:
Topic: my-replicated-topic Partition: 0Leader: 2Replicas: 1,2,0Isr: 2,0

但是消息依然可以使用,即使最初执行写入的leader已经杀掉了。

> bin/kafka-console-consumer.sh  --zookeeper  localhost:2181   --from-beginning  --topic  my-replicated-topic
...
my test message 1
my test message 2
^C


1.4   Ecosystem

除主要的分布式功能之外,Kafka 可以和很多工具集成在一起。                 

ecosystem page列出很多可以集成的工具,包含数据流处理系统,Hadoop 集成、检测以及开发工具。


1.5  Upgrading  From Previous  Versions

从0.8.0 升级到0.8.1

0.8.1与0.8完全兼容,升级可以通过简单的停掉一个broker,然后升级代码,并重启它就可以了。

从0.7升级

0.8 发布版本中增加了备份功能,是第一个向后不兼容的发布版:主要改变主要在kafka  API, zookeeper 数据结构,以及协议,还有配置。从0.7升级到0.8.x需要特殊工具已进行数据迁移,迁移不必进行停止程序运行。


已有(1)人评论

跳转到指定楼层
yngwiewang 发表于 2017-11-6 16:05:21
学习了,谢谢。
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条