本帖最后由 pig2 于 2018-8-16 19:03 编辑
问题导读
1.对于不同团队负责写生产者和消费者,消息格式该如何商定?
2.什么样的格式性能更高?
3.如何实现序列化和反序列化?
关注最新经典文章,欢迎关注公众号
在Apache Kafka中,称为生产者的Java应用程序将结构化消息写入Kafka集群(由broker组成)。 同样,名为consumer的Java应用程序从同一个集群中读取这些消息。 在一些公司中,有不同的小组负责编写和管理生产者和消费者。 在这种情况下,一个主要的痛点可能是协调生产者和消费者之间商定的消息格式。
此示例演示如何使用Apache Avro序列化为Apache Kafka生成的记录,同时允许生成Schema和生成器和使用者应用程序的非同步更新。
序列化和反序列化
Kafka记录(以前称为消息)由键,值和标题组成。 Kafka不知道记录的Key和Value中的数据结构。 它将它们作为字节数组处理。 但是从Kafka读取记录的系统确实关心这些记录中的数据。 因此,需要以可读格式生成数据。 使用的数据格式应该是
- 紧凑
- 快速编码和解码
- 允许改变
- 允许上游系统(写入Kafka集群的系统)和下游系统(从同一Kafka集群读取的系统)在不同时间升级到更新的schema.
例如,JSON是自解释的,但不是紧凑的数据格式,并且解析速度慢。 Avro是一个快速序列化框架,可以创建相对紧凑的输出。 但要读取Avro记录,需要使用数据序列化的模式。
从对象到Kafka的记录,在由记录转换为可处理对象
生产者应用程序不需要将数据直接转换为字节数组。 KafkaProducer是一个泛型类,需要用户指定键和值类型。 然后,producer接受具有相同类型参数的ProducerRecord实例。 从对象到字节数组的转换由Serializer完成。 Kafka提供了一些原始序列化器:例如,IntegerSerializer,ByteArraySerializer,StringSerializer。 在消费者方面,类似的反序列化器将字节数组转换为应用程序可以处理的对象。
因此,在Serializer和Deserializer级别挂钩并允许生产者和消费者应用程序的开发人员使用Kafka提供的interface是有意义的。 虽然最新版本的Kafka允许ExtendedSerializers和ExtendedDeserializers访问header,但我们决定在Kafka记录的key和value中包含schema 标识符,而不是添加记录标头。
Avro 要点
Avro是一个数据序列化(和远程过程调用)框架。 它使用名为schema的JSON文档来描述数据结构。 大多数Avro使用的是GenericRecord或SpecificRecord的子类。从Avro模式生成的Java类是后者的子类,而前者可以在没有事先了解数据结构的情况下使用。
当两个模式满足一组兼容性规则时,用一个模式(称为写入器schema)编写的数据可以被读取,就好像它是用另一个模式编写的(称为读取器schema)。schema具有规范形式,其具有与序列化无关的所有细节,例如注释,被剥离以帮助等效性检查。
VersionedSchema和SchemaProvider
如前所述,我们需要在Schema及其标识符之间进行一对一映射。 有时,通过名称引用Schema更容易。 创建兼容Schema时,可以将其视为Schema的下一个版本。 因此,我们可以引用具有名称,版本对的Schema。 让我们将Schema,标识符,名称和版本一起调用VersionedSchema。 此对象可能包含应用程序所需的其他元数据。
[mw_shl_code=java,true]public class VersionedSchema {
private final int id;
private final String name;
private final int version;
private final Schema schema;
public VersionedSchema(int id, String name, int version, Schema schema) {
this.id = id;
this.name = name;
this.version = version;
this.schema = schema;
}
public String getName() {
return name;
}
public int getVersion() {
return version;
}
public Schema getSchema() {
return schema;
}
public int getId() {
return id;
}
}[/mw_shl_code]
SchemaProvider对象可以查找VersionedSchema的实例。
[mw_shl_code=java,true]public interface SchemaProvider extends AutoCloseable {
public VersionedSchema get(int id);
public VersionedSchema get(String schemaName, int schemaVersion);
public VersionedSchema getMetadata(Schema schema);
}[/mw_shl_code]
序列化通用数据
在序列化记录时,我们首先需要确定要使用的Schema。 每条记录都有一个getSchema方法。 但是从模式中找出标识符可能很耗时。 在初始化时设置架构通常更有效。 这可以通过标识符或名称和版本直接完成。 此外,在生成多个主题时,我们可能希望为不同的主题设置不同的Schema,并从作为参数提供的主题名称中找出方法序列化(T,String)。 为简洁起见,我们的示例中省略了该逻辑。
[mw_shl_code=java,true]
private VersionedSchema getSchema(T data, String topic) {
return schemaProvider.getMetadata( data.getSchema());
}[/mw_shl_code]
有了schema,我们需要将它存储在我们的消息中。 将ID作为消息的一部分进行序列化为我们提供了一个紧凑的解决方案,因为所有都发生在Serializer / Deserializer中。 它还可以非常轻松地与已经支持Kafka的其他框架和库集成,并允许用户使用自己的序列化程序(例如Spark)。
[mw_shl_code=java,true]private void writeSchemaId(ByteArrayOutputStream stream, int id) throws IOException {
try (DataOutputStream os = new DataOutputStream(stream)) {
os.writeInt(id);
}
}[/mw_shl_code]
然后我们可以创建一个DatumWriter并序列化该对象。
[mw_shl_code=java,true]private void writeSerializedAvro(ByteArrayOutputStream stream, T data, Schema schema) throws IOException {
BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(stream, null);
DatumWriter<T> datumWriter = new GenericDatumWriter<>(schema);
datumWriter.write(data, encoder);
encoder.flush();
}[/mw_shl_code]
综上所述,我们实现了通用数据序列化器。
[mw_shl_code=java,true]public class KafkaAvroSerializer<T extends GenericContainer> implements Serializer<T> {
private SchemaProvider schemaProvider;
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
schemaProvider = SchemaUtils.getSchemaProvider(configs);
}
@Override
public byte[] serialize(String topic, T data) {
try (ByteArrayOutputStream stream = new ByteArrayOutputStream()) {
VersionedSchema schema = getSchema(data, topic);
writeSchemaId(stream, schema.getId());
writeSerializedAvro(stream, data, schema.getSchema());
return stream.toByteArray();
} catch (IOException e) {
throw new RuntimeException("Could not serialize data", e);
}
}
private void writeSchemaId(ByteArrayOutputStream stream, int id) throws IOException {...}
private void writeSerializedAvro(ByteArrayOutputStream stream, T data, Schema schema) throws IOException {...}
private VersionedSchema getSchema(T data, String topic) {...}
@Override
public void close() {
try {
schemaProvider.close();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}[/mw_shl_code]
反序列化通用数据
反序列化可以使用单个schema(schema数据是用其编写的),但你可以指定不同的读取器schema。 读取器schema必须与数据序列化的schema兼容,但不必相同。 出于这个原因,我们引入了schema名称。 我们现在可以指定我们想要使用特定版本的schema读取数据。 在初始化时,我们读取每个schema名称所需的模式版本,并将元数据存储在readerSchemasByName中以便快速访问。 现在,我们可以读取使用兼容版本的schema编写的每条记录,就好像它是使用指定版本编写的一样。
[mw_shl_code=java,true]@Override
public void configure(Map<String, ?> configs, boolean isKey) {
this.schemaProvider = SchemaUtils.getSchemaProvider(configs);
this.readerSchemasByName = SchemaUtils.getVersionedSchemas(configs, schemaProvider);
}[/mw_shl_code]
当需要反序列化记录时,我们首先读取writer Schema的标识符。 这样可以按名称查找阅读器Schema。 有了这两种Schema,我们就可以创建一个GeneralDatumReader并读取记录。
[mw_shl_code=java,true]@Override
public GenericData.Record deserialize(String topic, byte[] data) {
try (ByteArrayInputStream stream = new ByteArrayInputStream(data)) {
int schemaId = readSchemaId(stream);
VersionedSchema writerSchema = schemaProvider.get(schemaId);
VersionedSchema readerSchema =
readerSchemasByName.get(writerSchema.getName());
GenericData.Record avroRecord = readAvroRecord(stream,
writerSchema.getSchema(), readerSchema.getSchema());
return avroRecord;
} catch (IOException e) {
throw new RuntimeException(e);
}
}
private int readSchemaId(InputStream stream ) throws IOException {
try(DataInputStream is = new DataInputStream(stream)) {
return is.readInt();
}
}
private GenericData.Record readAvroRecord(InputStream stream, Schema writerSchema, Schema readerSchema) throws IOException {
DatumReader<Object> datumReader = new GenericDatumReader<>(writerSchema,
readerSchema);
BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(stream, null);
GenericData.Record record = new GenericData.Record(readerSchema);
datumReader.read(record, decoder);
return record;
}[/mw_shl_code]
处理特定记录
通常,我们希望将一个类用于记录。 然后,此类通常从Avro架构生成。 Apache Avro提供了从模式生成Java代码的工具。 其中一个工具是Avro Maven插件。 生成的类具有从运行时可用的生成的模式。 这使序列化和反序列化更简单,更有效。 对于序列化,我们可以使用该类来查找要使用的模式标识符。
[mw_shl_code=java,true]@Override
public void configure(Map<String, ?> configs, boolean isKey) {
String className = configs.get(isKey ? KEY_RECORD_CLASSNAME : VALUE_RECORD_CLASSNAME).toString();
try (SchemaProvider schemaProvider = SchemaUtils.getSchemaProvider(configs)) {
Class<?> recordClass = Class.forName(className);
Schema writerSchema = new
SpecificData(recordClass.getClassLoader()).getSchema(recordClass);
this.writerSchemaId = schemaProvider.getMetadata(writerSchema).getId();
} catch (Exception e) {
throw new RuntimeException(e);
}
}[/mw_shl_code]
因此,我们不需要逻辑来从主题和数据中确定schema。 我们使用记录类中可用的schema来编写记录。
类似地,对于反序列化,可以从类本身中找到读取器schema。 反序列化逻辑变得更简单,因为读取器schema在配置时是固定的,不需要通过模式名称查找。
[mw_shl_code=java,true]@Override
public T deserialize(String topic, byte[] data) {
try (ByteArrayInputStream stream = new ByteArrayInputStream(data)) {
int schemaId = readSchemaId(stream);
VersionedSchema writerSchema = schemaProvider.get(schemaId);
return readAvroRecord(stream, writerSchema.getSchema(), readerSchema);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
private T readAvroRecord(InputStream stream, Schema writerSchema, Schema readerSchema) throws IOException {
DatumReader<T> datumReader = new SpecificDatumReader<>(writerSchema, readerSchema);
BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(stream, null);
return datumReader.read(null, decoder);
}[/mw_shl_code]
|
|