
kafka权威指南 第四章第2、3节 创建kafka消费者并订阅Topics

开始消费的第一步是创建一个KafkaConsumer实例,创建一个KafkaConsumer非常类似于创建一个KafkaProducer,你可以创建一个Java属性实例,并使用你希望传递给使用者的属性。我们将在后面的章节中详细讨论所有的属性。首先,我们只需要使用3个属性:bootstrap.servers, key.deserializer 和 value.deserializer.



这里唯一的新属性是group.id - 这是消费者组的名字,这是消费者的一部分。



Creating a Kafka Consumer
The first step to start consuming records is to create a KafkaConsumer instance. Creating a KafkaConsumer is very similar to creating a KafkaProducer - you create a Java Properties instance with the properties you want to pass to the consumer. We will discuss all the properties in depth later in the chapter. To start we just need to use the 3 mandatory properties: bootstrap.servers, key.deserializer and value.deserializer.

The first property, bootstrap.servers is the connection string to Kafka cluster. It is used the exact same way it is used in KafkaProducer, and you can refer to Chapter 3 to see specific details on how this is defined. The other two properties key.deserializer and value.deserializer are similar to the serializers defined for the producer,but rather than specifying classes that turn Java objects to a ByteArray, you need to specify classes that can take a ByteArray and turn it into a Java object.

There is a fourth property, which is not strictly mandatory, but for now we will pretend it is. The property is group.id and it specifies the Consumer Group the Kafka‐Consumer instance belongs to. While it is possible to create consumers that do not belong to any consumer group, this is far less common and for most of the chapter we will assume the consumer is part of a group.

The following code snippet shows how to create a KafkaConsumer:
[mw_shl_code=java,true]Properties props = new Properties();
props.put("bootstrap.servers", "broker1:9092,broker2:9092");
props.put("group.id", "CountryCounter");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String,
Most of what you see here should be very familiar if you’ve read Chapter 3 on creating producers. We are planning on consuming Strings as both key and value, so we use the built-in StringDeserializer and we create KafkaConsumer with String types.

The only new property here is group.id - which is the name of the consumer group this consumer will be part of.

Subscribing to Topics
Once we created a consumer, the next step is to subscribe to one or more topics. The subcribe() method takes a list of topics as a parameter, so its pretty simple to use:
Here we simply create a list with a single element, the topic name “customer‐Countries”It is also possible to call subscribe with a regular expression. The expression can match multiple topic names and if someone creates a new topic with a name that matches, a rebalance will happen almost immediately and the consumers will start consuming from the new topic. This is useful for applications that need to consume from multiple topics and can handle the different types of data the topics will contain. It is most common in applications that replicate data between Kafka and another system.

To subscribe to all test topics, we can call:



