本帖最后由 levycui 于 2017-8-8 13:49 编辑
问题导读:
1、如何配置kafka Consumers?
2、kafka重要的属性有哪些?
3、配置中如何判断consumer不可用?
4、kafka两种分配策略是什么?
配置Consumers
到目前为止,我们一直专注于学习Consumer API,但是我们只看到了很少的配置属性——只是配置强制性的bootstrap.servers,key.deserializer and value.deserializer. 所有的Consumer配置都在Apache kafka文档中记录:[http://kafka.apache.org/documentation.html#newconsumerconfigs]. 大多数参数默认值都有合理的,不需要修改,但是有些参数对性能和可用性有影响。让我们来看看一些更重要的属性:
fetch.min.bytes
该属性允许consumer在获取记录时指定从broker接收的最小数据量。如果broker接收来自消费者的记录请求,但是新记录的字节数少于min.fetch.bytes, broker将等待更多的消息,然后将这些记录发送回consumer。这减少了Consumer和Broker的负载,因为在主题没有太多新活动的情况下(或者活动时间较低),它们必须处理较少的转发消息。如果Consumer在没有太多可用数据的情况下使用了太多的CPU,或者在有大量Consumer的情况下减少了brokers的负载,那么你需要设置这个参数高于默认值。
fetch.max.wait.ms
通过设置fetch.min.bytes你告诉kafka要等到它有足够的数据才会发送给consumer。fetch.max.wait.ms让你控制等待的时间。在默认情况下,kafka会等上500毫秒,这导致了多达500毫秒的额外延迟,以防没有足够的数据流到Kafka topic,以满足返回的最小数据量。如果你想限制潜在的延迟(通常是因为SLAs控制了应用程序的最大延迟) 你可以设置fetch.max.wait.ms到更低的值。
如果你设置fetch.max.wait.ms为100ms和fetch.min.bytes到1MB,kafka将收到来自consumer的取回请求,当它达到1MB的数据或在100ms之后,它将进行数据响应。
max.partition.fetch.bytes
该属性控制服务器每次分区返回的最大字节数。
默认值是1MB,这意味着当KafkaConsumer.poll()返回ConsumerRecords,这个record对象将使用max.partition.fetch.bytes分配给Consumer的每个分区使用。因此,如果一个Topic有20个分区,并且你有5个consumer,那么每个consumer将需要有4MB的内存可用来进行消费记录。
在实践中,您将希望分配更多的内存,因为如果组中的其他用户失败,每个consumer将需要处理更多的分区。max.partition.fetch.bytes必须大于代理将接受的最大消息(max.message.size 使broker配置中的属性),或者broker可能会有consumer无法使用的消息,在这种情况下,consumer将会坚持读取他们的消息。另一个重要的考虑事项,在设置max.partition.fetch.bytes时,是consumer处理数据所需的时间。正如你所记得的,使用者必须频繁地调用poll(),以避免会话超时和随后的再平衡。如果单个poll()返回的数据量非常大,可能需要花费更长的时间来处理,这意味着它不会及时到达轮询循环的下一个迭代,以避免会话超时。如果出现这种情况,两个选项要么降低max.partition.fetch.bytes,要么增加会话超时。
session.timeout.ms
consumer在被认为还活着的时候,可能会失去与brokers的联系,默认为3秒。如果一个consumer的消费超过了session.timeout.ms时间。在没有向组协调器发送心跳的情况下,它被认为是死亡的,组协调器将触发消费者组的再平衡,将从死亡的consumer分配到组中的其他consumer。
这个属性与heartbeat.interval.ms密切相关。heartbeat.interval.ms控件的频率将会向组协调器发送一个心跳频率。而session.timeout.ms控制消费者在不发送心跳的情况下能持续多久。因此,这两个属性通常是一起修改的。heatbeat.interval.ms必须小于session.timeout.ms,通常设置为1/3的超时值。如果session.timeout.ms是3秒,heartbeat.interval.ms应该是1秒。设置session.timeout.ms低于默认值将允许consumer组更快地发现并从失败中恢复,但也可能导致不必要的重新平衡,因为consumers花费更长的时间来完成轮询循环或垃圾收集。设置session.timeout.ms更高的工作将减少意外重新平衡的机会,但也意味着要发现真正的失败需要更长的时间。
auto.offset.reset
这个属性控制consumer的行为当它开始读取一个分区时它没有一个被提交的偏移量或者它的已提交的偏移量是无效的(通常因为consumer的消费时间很长而抵消的记录已经从broker中走出来了)。默认值是“latest”,这意味着缺少有效的偏移量,消费者将从最新的记录中读取数据(记录在consumer开始运行后的记录)。另一种选择是“earliest”,这意味着缺少有效的偏移量,消费者将从一开始就读取分区中的所有数据。
enable.auto.commit
在本章中,我们讨论了提交补偿的不同选项。该参数控制consumer是否会自动提交补偿,并默认为true。如果您想要控制偏移量,则将其设置为false,这对于减少重复和避免丢失数据是必要的。如果你设置了启用enable.auto.commit为 true,那么你可能还想要控制使用auto.commit.interval.ms的频率。
partition.assignment.strategy
我们了解到,在一个consumer组中,分区被分配给consumers。PartitionAssignor是一个类,给定consumer和他们订阅的Topic,决定将哪个分区分配给哪个consumer。默认情况下,kafka有两种分配策略:
* Range——从每个用户订阅的每个Topic中分配一个连续的分区子集。因此,如果消费者C1和C2订阅了两个Topic,T1和T2,每个Topic都有3个分区。然后,C1将从Topic T1和T2分配为0和1,而C2将被分配到这些Topic的分区2。请注意,由于每个Topic的分区数量都不均匀,并且每个Topic都独立完成分配任务,所以第一个使用者的分区数量比第二个要多。当使用范围分配时,这种情况就会发生,而consumer的数量并没有将每个Topic中的分区数量划分得很整齐。
* RoundRobin——它从所有订阅的Topic中获取所有的分区,然后按顺序将它们分配给consumer。如果上面描述的C1和C2将使用RoundRobin赋值,C1将从Topic T1和分区1从Topic T2获得分区0和2。C2将从Topic T1得到分区1,并从主题 T2分区0和2。一般来说,如果所有的consumer都订阅了相同的Topic(一个非常常见的场景),那么RoundRobin分配将最终导致所有的用户拥有相同数量的分区(或者最多只有一个分区差异)。
partition.assignment.strategy允许您选择一个分区分配策略。
默认值是org.apache.kafka.clients.consumer.RangeAssignor实现了上面描述的范围策略。
你可以用org.apache.kafka.clients.consumer.RoundRobinAssignor取而代之。一个更高级的选项是实现你的分配策略,在这种情况下partition.assignment.strategy应该指向你的类名。
client.id
这可以是任何字符串,brokers将使用它来标识从客户端发送的消息。它用于日志记录、度量标准和配额。
原文:
Configuring Consumers
So far we have focused on learning the Consumer API, but we’ve only seen very few of the configuration properties - just the mandatory bootstrap.servers, group.id, key.deserializer and value.deserializer. All the Consumer configuration is documented in Apache Kafka documentation:[http://kafka.apache.org/documentation.html#newconsumerconfigs]. Most of the parameters have reasonable defaults and do not require modification, but some have implications on performance and availability of the consumers. Lets take a look at some of the more important properties:
fetch.min.bytes
This property allows a consumer to specify the minimum amount of data that it wants to receive from the broker when fetching records. If a Broker receives a request for records from a Consumer but the new records amount to fewer bytes than min.fetch.bytes, the broker will wait until more messages are available before sending the records back to the consumer. This reduces the load on both the Consumer and the Broker as they have to handle fewer back-and-forward messages in cases where the topics don’t have much new activity (or for lower activity hours of the day).
You will want to set this parameter higher than the default if the Consumer is using too much CPU when there isn’t much data available, or to reduce load on the brokers when you have large number of consumers.
fetch.max.wait.ms
By setting fetch.min.bytes you tell Kafka to wait until it has enough data to send before responding to the consumer. fetch.max.wait.ms lets you control how long to wait. By default Kafka will wait up to 500ms. This results in up to 500ms of extra latency in case there is not enough data flowing to the Kafka topic to satisfy the minimum amount of data to return. If you want to limit the potential latency (usually due to SLAs controlling the maximum latency of the application), you can set fetch.max.wait.ms to lower value. If you set fetch.max.wait.ms to 100ms and fetch.min.bytes to 1MB, Kafka will recieve a fetch request from the consumer and will respond with data either when it has 1MB of data to return or after 100ms, whichever happens first.
max.partition.fetch.bytes
This property controls the maximum number of bytes the server will return per partition.
The default is 1MB, which means that when KafkaConsumer.poll() returns
ConsumerRecords, the record object will use at most max.partition.fetch.bytes
per partition assigned to the Consumer. So if a topic has 20 partitions, and you have 5consumers, each consumer will need to have 4MB of memory available for ConsumerRecords.
In practice, you will want to allocate more memory as each consumer will need to handle more partitions if other consumers in the group fail. max.parti
tion.fetch.bytes must be larger than the largest message a broker will accept (max.message.size property in the broker configuration), or the broker may have messages that the consumer will be unable to consumer, in which case the consumer will hang trying to read them. Another important consideration when setting max.partition.fetch.bytes is the amount of time it takes the consumer to process data. As you recall, the consumer must call poll() frequently enough to avoid session timeout and subsequent rebalance. If the amount of data a single poll() returns is very large, it may take the consumer longer to process, which means it will not get to the next iteration of the poll loop in time to avoid a session timeout. If this occurs the two options are either to lower max.partition.fetch.bytes or to increase the session timeout.
session.timeout.ms
The amount of time a consumer can be out of contact with the brokers while still considered alive, defaults to 3 seconds. If a consumer goes for more than session.timeout.ms without sending a heartbeat to the group coordinator, it is consid‐ered dead and the group coordinator will trigger a rebalance of the consumer group to allocate partitions from the dead consumer to the other consumers in the group.
This property is closely related to heartbeat.interval.ms. heartbeat.interval.ms controls how frequently the KafkaConsumer poll() method will send a heartbeat to the group coordinator, while session.timeout.ms controls how long can a consumer go without sending a heartbeat. Therefore, thoese two properties are typically modified together - heatbeat.interval.ms must be lower than session.timeout.ms, and is usually set to a 1/3 of the timeout value. So if session.timeout.ms is 3 seconds, heartbeat.interval.ms should be 1 second. Setting session.timeout.ms lower than default will allow consumer groups to detect and recover from failure sooner, but may also cause unwanted rebalances as result of consumers taking longer to complete the poll loop or garbage collection. Setting session.timeout.ms higher will reduce the chance of accidental rebalance, but also means it will take longer to detect a real failure.
auto.offset.reset
This property controls the behavior of the consumer when it starts reading a partition for which it doesn’t have a committed offset or if the committed offset it has is invalid(usually because the consumer was down for so long that the record with that offset was already aged out of the broker). The default is “latest”, which means that lacking a valid offset the consumer will start reading from the newest records (records which were written after the consumer started running). The alternative is “earliest”, which means that lacking a valid offset the consumer will read all the data in the partition, starting from the very beginning.
enable.auto.commit
We discussed the different options for committing offsets earlier in this chapter. This parameter controls whether the consumer will commit offsets automatically and defaults to true. Set it to false if you prefer to control when offsets are committed, which is necessary to minimize duplicates and avoid missing data. If you set enable.auto.commit to true then you may also want to control how frequently offsets will be committed using auto.commit.interval.ms.
partition.assignment.strategy
We learned that partitions are assigned to consumers in a consumer group. A PartitionAssignor is a class that, given consumers and topics they subscribed to, decides which partitions will be assigned to which consumer. By default Kafka has two assignment strategies: * Range - which assigns to each consumer a consecutive subset of partitions from each topic it subscribes to. So if consumers C1 and C2 are subscribed to two topics, T1 and T2 and each of the topics has 3 partitions. Then C1 will be assigned partitions 0 and 1 from topics T1 and T2, while C2 will be assigned partition 2 from those topics. Note that because each topic has uneven number of partitions and the assignment is done for each topic independently, the first consumer ended up with more partitions than the second. This happens whenever Range assignment is used and the number of consumers does not divide the number of partitions in each topic neatly. * RoundRobin - which takes all the partitions from all subscribed topics and assigns them to consumers sequentially, one by one. If C1 and C2 described above would use RoundRobin assignment, C1 would have partitions 0 and 2 from topic T1 and partition 1 from topic T2. C2 would have partition 1 from topic T1 and partitions 0 and 2 from topic T2. In general, if all consumers are subscribed to the same topics (a very common scenario), RoundRobin assignment will end up with all consumers having the same number of partitions (or at most 1 partition difference).
partition.assignment.strategy allows you to choose a partition assignment strategy.
The default is org.apache.kafka.clients.consumer.RangeAssignor which implements the Range strategy described above. You can replace it with org.apache.kafka.clients.consumer.RoundRobinAssignor. A more advanced option will be to implement your own assignment strategy, in which case partition.assignment.strategy should point to the name of your class.
client.id
This can be any string, and will be used by the brokers to identify messages sent from the client. It is used in logging, metrics and for quotas.
|
|