本帖最后由 xingoo 于 2017-5-13 01:05 编辑
问题导读:
1 Broker有哪些基本的配置?
2 Broker有哪些需要了解的配置?
Borker配置
前面章节中的例子,用来作为单个节点的服务器示例是足够的,但是如果想要把它应用到生产环境,就远远不够了。在Kafka中有很多参数可以控制它的运行和工作。大部分的选项都可以忽略直接使用默认值就好,遇到一些特殊的情况你可以再考虑使用它们。
Broker的一般配置
有很多参数在部署集群模式时需要引起重视,这些参数都是broker最基本的配置,很多参数都需要依据集群的broker情况而变化。
broker.id
每个kafka的broker都需要有一个整型的唯一标识,这个标识通过broker.id来设置。默认的情况下,这个数字是0,但是它可以设置成任何值。需要注意的是,需要保证集群中这个id是唯一的。这个值是可以任意填写的,并且可以在必要的时候从broker集群中删除。比较好的做法是使用主机名相关的标识来做为id,比如,你的主机名当中有数字相关的信息,如hosts1.example.com,host2.example.com,那么这个数字就可以用来作为broker.id的值。
port
默认启动kafka时,监听的是TCP的9092端口,端口号可以被任意修改。如果端口号设置为小于1024,那么kafka需要以root身份启动。但是并不推荐以root身份启动。
zookeeper.connect
这个参数指定了Zookeeper所在的地址,它存储了broker的元信息。在前一章节的例子 中,Zookeeper是运行在本机的2181端口上,因此这个值被设置成localhost:2181。这个值可以通过分号设置多个值,每个值的格式都是hostname:port/path,其中每个部分的含义如下:
- hostname是zookeeper服务器的主机名或者ip地址
- port是服务器监听连接的端口号
- /path是kafka在zookeeper上的根目录。如果缺省,会使用根目录。
如果设置了chroot,但是它又不存在,那么broker会在启动的时候直接创建。
PS:为什么使用Chroot路径
一个普遍认同的最佳实践就是kafka集群使用chroot路径,这样zookeeper可以与其他应用共享使用,而不会有任何冲突。指定多个zookeeper服务器的地址也是比较好的做法,这样当zookeeper集群中有节点失败的时候,还可以正常连接其他的节点。
log.dirs
这个参数用于配置Kafka保存数据的位置,Kafka中所有的消息都会存在这个目录下。可以通过逗号来指定多个目录,kafka会根据最少被使用的原则选择目录分配新的parition。注意kafka在分配parition的时候选择的规则不是按照磁盘的空间大小来定的,而是分配的parition的个数多小。
num.recovery.thread.per.data.dir
kafka可以配置一个线程池,线程池的使用场景如下:
- 当正常启动的时候,开启每个parition的文档块segment
- 当失败后重启时,检查parition的文档块
- 当关闭kafka的时候,清除关闭文档块
默认,每个目录只有一个线程。最好是设置多个线程数,这样在服务器启动或者关闭的时候,都可以并行的进行操作。尤其是当非正常停机后,重启时,如果有大量的分区数,那么启动broker将会花费大量的时间。注意,这个参数是针对每个目录的。比如,num.recovery.threads.per.data.dir设置为8,如果有3个log.dirs路径,那么一共会有24个线程。
auto.create.topics.enable
在下面场景中,按照默认的配置,如果还没有创建topic,kafka会在broker上自动创建topic:
- 当producer向一个topic中写入消息时
- 当cosumer开始从某个topic中读取数据时
- 当任何的客户端请求某个topic的信息时
在很多场景下,这都会引发莫名其妙的问题。尤其是没有什么办法判断某个topic是否存在,因为任何请求都会创建该topic。如果你想严格的控制topic的创建,那么可以设置auto.create.topics.enable为false。
默认的主题配置Topic Defaults
kafka集群在创建topic的时候会设置一些默认的配置,这些参数包括分区的个数、消息的容错机制,这些信息都可以通过管理员工具以topic为单位进行配置。kafka为我们提供的默认配置,基本也能满足大多数的应用场景了。
PS:使用per.topic进行参数的覆盖
在之前的版本中,可以通过log.retention.hours.per.topic,log.retention.bytes.per.topic, log.segment.bytes.per.topic等覆盖默认的配置。现在的版本不能这么用了,必须通过管理员工具进行设置。
num.partitions
这个参数用于配置新创建的topic有多少个分区,默认是1个。注意partition的个数只可以被增加,不能被减少。这就意味着如果想要减少主题的分区数,那么就需要重新创建topic。
在第一章中介绍过,kafka通过分区来对topic进行扩展,因此需要使用分区的个数来做负载均衡,如果新增了broker,那么就会引发重新负载分配。这并不意味着所有的主题的分区数都需要大于broker的数量,因为kafka是支持多个主题的,其他的主题会使用其余的broker。需要注意的是,如果消息的吞吐量很高,那么可以通过设置一个比较大的分区数,来分摊压力。
log.retention.ms
这个参数用于配置kafka中消息保存的时间,也可以使用log.retention.hours,默认这个参数是168个小时,即一周。另外,还支持log.retention.minutes和log.retention.ms。这三个参数都会控制删除过期数据的时间,推荐还是使用log.retention.ms。如果多个同时设置,那么会选择最小的那个。
PS:过期时间和最后修改时间
过期时间是通过每个log文件的最后修改时间来定的。在正常的集群操作中,这个时间其实就是log段文件关闭的时间,它代表了最后一条消息进入这个文件的时间。然而,如果通过管理员工具,在brokers之间移动了分区,那么这个时候会被刷新,就不准确了。这就会导致本该过期删除的文件,被继续保留了。
log.retention.bytes
这个参数也是用来配置消息过期的,它会应用到每个分区,比如,你有一个主题,有8个分区,并且设置了log.retention.bytes为1G,那么这个主题总共可以保留8G的数据。注意,所有的过期配置都会应用到patition粒度,而不是主题粒度。这也意味着,如果增加了主题的分区数,那么主题所能保留的数据也就随之增加了。
PS:通过大小和时间配置数据过期
如果设置了log.retention.bytes和log.retention.ms(或者其他过期时间的配置),只要满足一个条件,消息就会被删除。比如,设置log.retention.ms是86400000(一天),并且log.retention.bytes是1000000000(1G),那么只要修改时间大于一天或者数据量大于1G,这部分数据都会被删除。
log.segment.bytes
这个参数用来控制log段文件的大小,而不是消息的大小。在kafka中,所有的消息都会进入broker,然后以追加的方式追加到分区当前最新的segment段文件中。一旦这个段文件到达了log.segment.bytes设置的大小,比如默认的1G,这个段文件就会被关闭,然后创建一个新的。一旦这个文件被关闭,就可以理解成这个文件已经过期了。这个参数设置的越小,那么关闭文件创建文件的操作就会越频繁,这样也会造成大量的磁盘读写的开销。
通过生产者发送过来的消息的情况可以判断这个值的大小。比如,主题每天接收100M的消息,并且log.segment.bytes为默认设置,那么10天后,这个段文件才会被填满。由于段文件在没有关闭的时候,是不能删除的,log.retention.ms又是默认的设置,那么这个消息将会在17天后,才过期删除。因为10天后,段文件才关闭。再过7天,这个文件才算真正过期,才能被清除。
PS:根据时间戳追踪offset
段文件的大小也会影响到消息消费offset的操作,因为读取某一时间的offset时,kafka会寻找关闭时间晚于offset时间的那个段文件。然后返回offset所在的段文件的第一个消息的offset,然后按照偏移值查询目标的消息。因此越小的段文件,通过时间戳消费offset的时候就会越精确。
log.segment.ms
这个参数也可以控制段文件关闭的时间,它定义了经过多长时间段文件会被关闭。跟log.retention.bytes和log.retention.ms类似,log.segment.ms和log.segment.bytes也不是互斥的。kafka会在任何一个条件满足时,关闭段文件。默认情况下,是不会设置Log.segment.ms的,也就意味着只会通过段文件的大小来关闭文件。
PS:基于时间关闭段文件的磁盘性能需求
当时使用基于时间的段文件限制,对磁盘的要求会很高。这是因为,一般情况下如果段文件大小这个条件不满足,会按照时间限制来关闭文件,此时如果分区数很多,主题很多,将会有大量的段文件同时关闭,同时创建。
message.max.bytes
这个参数用于限制生产者消息的大小,默认是1000000,也就是1M。生产者在发送消息给broker的时候,如果出错,会尝试重发;但是如果是因为大小的原因,那生产者是不会重发的。另外,broker上的消息可以进行压缩,这个参数可以使压缩后的大小,这样能多存储很多消息。
需要注意的是,允许发送更大的消息会对性能有很大影响。更大的消息,就意味着broker在处理网络连接的时候需要更长的时间,它也会增加磁盘的写操作压力,影响IO吞吐量。
PS:配合消息大小的设置
消息大小的参数message.max.bytes一般都和消费者的参数fetch.message.max.bytes搭配使用。如果fetch.message.max.bytes小于message.max.bytes,那么当消费者遇到很大的消息时,将会无法消费这些消息。同理,在配置cluster时replica.fetch.max.bytes也是一样的道理。
|
|