问题导读
1.什么是序列化,反序列化?
2.kafka中使用串行器、解释器要注意什么?
3.如何自定义解释器?
4.消费者端如何使用AVRO就行反序列化?
Deserializers 解释器
正如在前面的章节中,Kafka的生产者需要序列化程序来将对象转换为字节数组,然后发送到Kafka。同样,Kafka的消费者需要将从Kafka接收到的字节数组反序列化转换成java对象。在之前的例子中,我们假设每条消息都是键值类型为字符串型,在消费者端默认使用StringDeserializer。
在前一章对Kafka的生产者,我们已经看到了如何序列化自定义类型以及如何使用Avro和Avroserializers从模式定义生成Avro对象然后序列化产生的消息发送到Kafka。现在我们来看看如何反序列化你自己创建的自定义对象以及如何使用Avro其反序列化。
很明显,用于Kafka生产事件的序列化必须和用于消费者事件的反序列化相匹配。序列化使用IntSerializer,反序列化使用StringDeserializer是无法工作的。这意味着,作为一个开发者,你需要跟踪哪些串行器被用在哪些主题上,并确保每个主题只包含你使用解释器可以解析的数据。这是一个使用Avro和序列化和反序列化的好处-Avroserializer可以确保所有的数据写入到一个特定的主题与并与主题中的数据模式兼容,这意味着它可以通过解释器和模式被反序列化。兼容性中的任何错误——在生产者或消费者方面将很容易地被一个适当的错误消息捕获,这意味着你不需要尝试调试字节数组用于串行化错误。我们将开始迅速展示如何编写一个自定义序列化器,虽然这是不推荐的方法,然后我们转到一个使用Avro反序列化消息键和值的例子。
Custom Deserializers 自定义解释器 我们取第3章相同的序列化的自定义对象,然后给他写一个解释器。 [mw_shl_code=java,false]public class Customer {
private int customerID;
private String customerName;
public Customer(int ID, String name) {
this.customerID = ID;
this.customerName = name;
}
public int getID() {
return customerID;
}
public String getName() {
return customerName;
}
}[/mw_shl_code] 解释器如下所示: [mw_shl_code=java,false]import org.apache.kafka.common.errors.SerializationException;
import java.nio.ByteBuffer;
import java.util.Map;
public class CustomerDeserializer implements Deserializer<Customer> {
@Override
public void configure(Map configs, boolean isKey) {
// nothing to configure
}
@Override
public Customer deserialize(String topic, byte[] data) {
int id;
int nameSize;
String name;
try {
if (data == null)
return null;
if (data.length < 8)
throw new SerializationException("Size of data received by IntegerDeserializer
is shorter than expected");
ByteBuffer buffer = ByteBuffer.wrap(data);
id = buffer.getInt();
String nameSize = buffer.getInt();
byte[] nameBytes = new Array[Byte](nameSize);
buffer.get(nameBytes);
name = new String(nameBytes, 'UTF-8');
return new Customer(id, name);
} catch (Exception e) {
throw new SerializationException("Error when serializing Customer to
byte[] " + e);
}
}
@Override
public void close() {
// nothing to close
}
}[/mw_shl_code] 消费者端使用序列化的代码如下: [mw_shl_code=java,false]Properties props = new Properties();
props.put("bootstrap.servers", "broker1:9092,broker2:9092");
props.put("group.id", "CountryCounter");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.Customer-
Deserializer");
KafkaConsumer<String, Customer> consumer = new KafkaConsumer<>(props);
consumer.subscribe("customerCountries")
while (true) {
ConsumerRecords<String, Customer> records = consumer.poll(100);
for (ConsumerRecord<String, Customer> record : records)
{
System.out.println("current customer Id: " + record.value().getId() + " and
current customer name: " + record.value().getName());
}
}[/mw_shl_code] 再次,需要注意的是,不建议实现自定义串行器解串器。它与生产者和消费者紧密耦合并且是不稳定, 容易出错的。一个更好的解决方案是使用一个标准的消息格式化框架,诸如Thrift、protobuf或Avro。现在我们将看到如何通过Kafka消费者使用Avro反序列化器。关于Apache Avro的背景,其模式和兼容性等功能,请参照3章。
通过Kafka Consumer使用Avro反序列化。假设我们使用的是第3章Avro消费者的实现类。为了从kafka中消费这些对象,实现类似于此的消费应用程序: [mw_shl_code=java,false]Properties props = new Properties();
props.put("bootstrap.servers", "broker1:9092,broker2:9092");
props.put("group.id", "CountryCounter");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");//KafkaAvroDeserializer反序列化Avro消息
props.put("schema.registry.url", schemaUrl);
String topic = "customerContacts"
KafkaConsumer consumer = new KafkaConsumer(createConsumerConfig(brokers,
groupId, url));
consumer.subscribe(Collections.singletonList(topic));
System.out.println("Reading topic:" + topic);
while (true) {
ConsumerRecords<String, Customer> records = consumer.poll(1000);
for (ConsumerRecord<String, Customer> record: records) {
System.out.println("Current customer name is: " + record.value().get-
Name());
}
consumer.commitSync();
}[/mw_shl_code]
|