本帖最后由 xingoo 于 2017-7-9 13:16 编辑
问题导读:
1 如何自定义Kafka序列化?
2 如何使用Avro进行序列化?
在前面的例子中,producer必须要设置序列化。我们已经看过如何使用默认的字符串序列化。kafka也提供了整型和字节数组,但是这些往往是不能满足大部分的用户需求的,因此最终你还是希望能以一些更通用的方法来序列化记录。
下面将会从如何自定义序列化开始,然后介绍一种推荐的序列化工具——Avro.
自定义序列化
如果你发送给kafka的数据不再是简单的String和整型,那么你可以选择一些通用的序列化工具,如Avro,Thrift或者Protobuf来创建记录,也可以通过自定义的序列化工具创建,强烈推荐使用一些通用的序列化工具。但是为了理解序列化的机制中的工作原理,我们就先来看看如何自定义序列化。
比如,序列化记录存储用户的名字,那么可以简单的创建下面的类:
[mw_shl_code=java,true]public class Customer {
private int customerID;
private String customerName;
public Customer(int ID, String name) {
this.customerID = ID;
this.customerName = name;
}
public int getID() {
return customerID;
}
public String getName() {
return customerName;
}
}[/mw_shl_code]
现在要为这个类创建序列化,可以像下面的代码:
[mw_shl_code=java,true]import org.apache.kafka.common.errors.SerializationException;
import java.nio.ByteBuffer;
import java.util.Map;
public class CustomerSerializer implements Serializer<Customer> {
@Override
public void configure(Map configs, boolean isKey) {
// nothing to configure
}
@Override
/**
We are serializing Customer as:
4 byte int representing customerId
4 byte int representing length of customerName in UTF-8 bytes (0 if name isNull)
N bytes representing customerName in UTF-8
*/
public byte[] serialize(String topic, Customer data) {
try {
byte[] serializedName;
int stringSize;
if (data == null)
return null;
else {
if (data.getName() != null) {
serializeName = data.getName().getBytes("UTF-8");
stringSize = serializedName.length;
} else {
serializedName = new byte[0];
stringSize = 0;
}
}
ByteBuffer buffer = ByteBuffer.allocate(4 + 4 + stringSize);
buffer.putInt(data.getID());
buffer.putInt(stringSize);
buffer.put(serializedName);
return buffer.array();
} catch (Exception e) {
throw new SerializationException("Error when serializing Customer tobyte[] " + e);
}
}
@Override
public void close() {
// nothing to close
}
}[/mw_shl_code]
在Producer的代码中配置序列化,使用自定义的序列化类,然后发送数据。另外,注意这个代码还是很脆弱的,比如我们有很多的customer,现在需要把id改成long型,或者增加一个新的日期字段,我们会发现很难跟老的消息进行兼容。在不同的版本间进行序列化和反序列化是一件很麻烦的事情,因为需要挨个字节进行对比。更糟糕的是,如果一个公司所有的组都在使用Kafka发送相同的数据,那么他们必须要使用同一个序列化工具,如果有代码需要修改,那么必须全部进行同步更新。
因此,我们推荐不要自定义新的序列化工具,而是使用类似Apache Avro、Thrift、Protobuf这种通用的工具。在下面的章节中,我们将会介绍如何使用Apache Avro对记录进行序列化。
使用Apache Avro序列化
Apache Avro是一种通用语言的序列化工具,这个项目是Doug Cutting创建用来在大量的客户端中间进行数据共享的。
Avro使用一种类似schema的东西来维护数据格式,schema一般都是用Json来描述,然后序列化成二进制文件。Avro在读取和写入文件的时候提供Schema的信息,并且是嵌入在数据我呢件的内部。
Avro还有一个特性使它更适合在Kafka这种消息系统中传输数据,就是写入消息的时候可以使用新的schema,而读取的时候还是按照之前的shcema读取也不会有任何问题。
比如原始的schema是下面的这种样子的:
[mw_shl_code=java,true]{
"namespace": "customerManagement.avro",
"type": "record",
"name": "Customer",
"fields": [
{"name": "id", "type": "int"},
{"name": "name", "type": "string""},
{"name": "faxNumber", "type": ["null", "string"], "default": "null"}
]
}[/mw_shl_code]
- id和name字段是必须要有的,其他的字段如fax是可选的,并且默认为null
如果这个schema已经使用一段时间了,并且已经产生了TB级的数据。随着时间的改变,21世纪的今天已经不需要使用传真了,因此我们使用新的email字段进行替代。
那么新的schema将会变成
[mw_shl_code=java,true]{
"namespace": "customerManagement.avro",
"type": "record",
"name": "Customer",
"fields": [
{"name": "id", "type": "int"},
{"name": "name", "type": "string"},
{"name": "email", "type": ["null", "string"], "default": "null"}
]
}[/mw_shl_code]
在升级到新的版本后,新的记录将会包含email字段,老的记录则会包含faxNumber字段,那么consumer读取到这样的数据后怎么处理呢?
读取数据的应用可能调用类似getName() getId()以及getFaxNumber()这种方法。如果遇到新的Schema的数据,getName()和getId()不会有什么问题,因为他们没有什么修改,但是getFaxNumber()将会返回null,因为现在已经没有faxNumber这个字段了。
现在假设我们已经升级了consumer的应用程序,现在getFaxNumber()方法被替换成了getEmail()方法。如果我们遇到了老的数据,getEmaill()方法将会返回null,因为这个记录里面还没有email字段。
最重要的事情是即使我们改变了消息的schema,也不需要改变读取数据的代码,不会有任何异常或者中断发生。
有两种需要知道的情况:
- 数据写入时使用的schema和数据读取的时候的schema必须要能匹配上。Avro的官网上面有例子。
- 在反序列化的时候需要知道数据的schema信息,Avro的schema是跟数据一起存储到文件的。但是在kafka中其实还有其他的更好的做法,稍后会有讲解
在Kafka中使用Avro
如果我们在Avro的数据文件中同时存储全部的Schema信息,那么记录的大小很可能会翻倍。然而,Avro在读取记录的时候是需要获得整个Schema的,因此需要能在任何地方都能读取到Schema.。为了达到这个目的,可以引入Schema的注册机制。思路就是将Schema的信息存储在注册中心,然后我们只会存储记录的Schema对应的唯一标识。当读取数据的时候,会根据这个唯一标识去注册中心读取Schema的信息,然后进行反序列化。所有的注册操作和读取操作都是由序列化和反序列化器来完成的。数据写入Kafka的时候只需要简单的引用下使用哪个序列化工具就可以 。
下面是一个在Kafka中基于Avro序列化数据的例子:
[mw_shl_code=java,true]Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
props.put("schema.registry.url", schemaUrl);
String topic = "customerContacts";
int wait = 500;
Producer<String, Customer> producer = new KafkaProducer<String, Customer>(props);
// We keep producing new events until someone ctrl-c
while (true) {
Customer customer = CustomerGenerator.getNext();
System.out.println("Generated customer " + customer.toString());
ProducerRecord<String, Customer> record = new ProducerRecord<>(topic, customer.getId(), customer);
producer.send(record);
}[/mw_shl_code]
- 使用KafkaAvroSerializer序列化对象
- schema.registry.url是一个新的参数,指定了存储schema信息的位置
- custmer是我们想要生成的对象
- 需要在ProducerRecord中指定记录的类型为Customer,并且基于Customer对象创建记录
- 然后使用Avro进行序列化发送数据
如果你想要用Avro序列化一些更通用的对象,那么也可以自定义Schema
[mw_shl_code=java,true]Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
props.put("schema.registry.url", url);
String schemaString = "{\"namespace\": \"customerManagement.avro\",\"type\": \"record\", " +
"\"name\": \"Customer\"," +
"\"fields\": [" +
"{\"name\": \"id\", \"type\": \"int\"}," +
"{\"name\": \"name\", \"type\": \"string\"}," +
"{\"name\": \"email\", \"type\": [\"null\",\"string
\"], \"default\":\"null\" }" +
"]}";
Producer<String, GenericRecord> producer = new KafkaProducer<String, GenericRecord>(props);
Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse(schemaString);
for (int nCustomers = 0; nCustomers < customers; nCustomers++) {
String name = "exampleCustomer" + nCustomers;
String email = "example " + nCustomers + "@example.com"
GenericRecord customer = new GenericData.Record(schema);
customer.put("id", nCustomer);
customer.put("name", name);
customer.put("email", email);
ProducerRecord<String, GenericRecord> data = new ProducerRecord<String, GenericRecord>("customerContacts",name, customer);
producer.send(data);
}
}[/mw_shl_code]
- 同样使用KafkaAvroSerializer
- 提供注册中心的地址
- 现在需要提供Schema的信息
- 需要创建Avro的GenericRecord对象,然后使用shema和data进行初始化
- ProducerRecord里面的内容是一个GenericRecord对象,它包含了我们自定义的Schema和数据。序列化器会知道怎么从记录中获得schema,并且将schema存储到注册中心,然后序列化对象。
|
|