分享

第1部分:Kafka不同团队开发生产者和消费者,该如何定义消息格式

pig2 2018-8-16 18:52:05 发表于 连载型 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 3 10430
本帖最后由 pig2 于 2018-8-16 19:03 编辑
问题导读

1.对于不同团队负责写生产者和消费者,消息格式该如何商定?
2.什么样的格式性能更高?
3.如何实现序列化和反序列化?

关注最新经典文章,欢迎关注公众号


在Apache Kafka中,称为生产者的Java应用程序将结构化消息写入Kafka集群(由broker组成)。 同样,名为consumer的Java应用程序从同一个集群中读取这些消息。 在一些公司中,有不同的小组负责编写和管理生产者和消费者。 在这种情况下,一个主要的痛点可能是协调生产者和消费者之间商定的消息格式。

此示例演示如何使用Apache Avro序列化为Apache Kafka生成的记录,同时允许生成Schema和生成器和使用者应用程序的非同步更新。

序列化和反序列化
Kafka记录(以前称为消息)由键,值和标题组成。 Kafka不知道记录的Key和Value中的数据结构。 它将它们作为字节数组处理。 但是从Kafka读取记录的系统确实关心这些记录中的数据。 因此,需要以可读格式生成数据。 使用的数据格式应该是
  • 紧凑
  • 快速编码和解码
  • 允许改变
  • 允许上游系统(写入Kafka集群的系统)和下游系统(从同一Kafka集群读取的系统)在不同时间升级到更新的schema.



例如,JSON是自解释的,但不是紧凑的数据格式,并且解析速度慢。 Avro是一个快速序列化框架,可以创建相对紧凑的输出。 但要读取Avro记录,需要使用数据序列化的模式。

从对象到Kafka的记录,在由记录转换为可处理对象
生产者应用程序不需要将数据直接转换为字节数组。 KafkaProducer是一个泛型类,需要用户指定键和值类型。 然后,producer接受具有相同类型参数的ProducerRecord实例。 从对象到字节数组的转换由Serializer完成。 Kafka提供了一些原始序列化器:例如,IntegerSerializer,ByteArraySerializer,StringSerializer。 在消费者方面,类似的反序列化器将字节数组转换为应用程序可以处理的对象。

因此,在Serializer和Deserializer级别挂钩并允许生产者和消费者应用程序的开发人员使用Kafka提供的interface是有意义的。 虽然最新版本的Kafka允许ExtendedSerializers和ExtendedDeserializers访问header,但我们决定在Kafka记录的key和value中包含schema 标识符,而不是添加记录标头。


Avro 要点
Avro是一个数据序列化(和远程过程调用)框架。 它使用名为schema的JSON文档来描述数据结构。 大多数Avro使用的是GenericRecord或SpecificRecord的子类。从Avro模式生成的Java类是后者的子类,而前者可以在没有事先了解数据结构的情况下使用。


当两个模式满足一组兼容性规则时,用一个模式(称为写入器schema)编写的数据可以被读取,就好像它是用另一个模式编写的(称为读取器schema)。schema具有规范形式,其具有与序列化无关的所有细节,例如注释,被剥离以帮助等效性检查。

VersionedSchema和SchemaProvider
如前所述,我们需要在Schema及其标识符之间进行一对一映射。 有时,通过名称引用Schema更容易。 创建兼容Schema时,可以将其视为Schema的下一个版本。 因此,我们可以引用具有名称,版本对的Schema。 让我们将Schema,标识符,名称和版本一起调用VersionedSchema。 此对象可能包含应用程序所需的其他元数据。

[mw_shl_code=java,true]public class VersionedSchema {
  private final int id;
  private final String name;
  private final int version;
  private final Schema schema;

  public VersionedSchema(int id, String name, int version, Schema schema) {
    this.id = id;
    this.name = name;
    this.version = version;
    this.schema = schema;
  }

  public String getName() {
    return name;
  }

  public int getVersion() {
    return version;
  }

  public Schema getSchema() {
    return schema;
  }
   
  public int getId() {
    return id;
  }
}[/mw_shl_code]
SchemaProvider对象可以查找VersionedSchema的实例。
[mw_shl_code=java,true]public interface SchemaProvider extends AutoCloseable {
  public VersionedSchema get(int id);
  public VersionedSchema get(String schemaName, int schemaVersion);
  public VersionedSchema getMetadata(Schema schema);
}[/mw_shl_code]

序列化通用数据
在序列化记录时,我们首先需要确定要使用的Schema。 每条记录都有一个getSchema方法。 但是从模式中找出标识符可能很耗时。 在初始化时设置架构通常更有效。 这可以通过标识符或名称和版本直接完成。 此外,在生成多个主题时,我们可能希望为不同的主题设置不同的Schema,并从作为参数提供的主题名称中找出方法序列化(T,String)。 为简洁起见,我们的示例中省略了该逻辑。
[mw_shl_code=java,true]
private VersionedSchema getSchema(T data, String topic) {
  return schemaProvider.getMetadata( data.getSchema());
}[/mw_shl_code]
有了schema,我们需要将它存储在我们的消息中。 将ID作为消息的一部分进行序列化为我们提供了一个紧凑的解决方案,因为所有都发生在Serializer / Deserializer中。 它还可以非常轻松地与已经支持Kafka的其他框架和库集成,并允许用户使用自己的序列化程序(例如Spark)。
[mw_shl_code=java,true]private void writeSchemaId(ByteArrayOutputStream stream, int id) throws IOException {
    try (DataOutputStream os = new DataOutputStream(stream)) {
    os.writeInt(id);
  }
}[/mw_shl_code]
然后我们可以创建一个DatumWriter并序列化该对象。
[mw_shl_code=java,true]private void writeSerializedAvro(ByteArrayOutputStream stream, T data, Schema schema) throws IOException {
  BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(stream, null);
  DatumWriter<T> datumWriter = new GenericDatumWriter<>(schema);
  datumWriter.write(data, encoder);
  encoder.flush();
}[/mw_shl_code]

综上所述,我们实现了通用数据序列化器。
[mw_shl_code=java,true]public class KafkaAvroSerializer<T extends GenericContainer> implements Serializer<T> {

  private SchemaProvider schemaProvider;

  @Override
  public void configure(Map<String, ?> configs, boolean isKey) {
    schemaProvider = SchemaUtils.getSchemaProvider(configs);
  }

  @Override
  public byte[] serialize(String topic, T data) {
    try (ByteArrayOutputStream stream = new ByteArrayOutputStream()) {
      VersionedSchema schema = getSchema(data, topic);
   
      writeSchemaId(stream, schema.getId());
      writeSerializedAvro(stream, data, schema.getSchema());
      return stream.toByteArray();
    } catch (IOException e) {
      throw new RuntimeException("Could not serialize data", e);
    }
  }

  private void writeSchemaId(ByteArrayOutputStream stream, int id) throws IOException {...}

  private void writeSerializedAvro(ByteArrayOutputStream stream, T data, Schema schema) throws IOException {...}

  private VersionedSchema getSchema(T data, String topic) {...}

  @Override
  public void close() {
    try {
      schemaProvider.close();
    } catch (Exception e) {
      throw new RuntimeException(e);
    }
  }
}[/mw_shl_code]

反序列化通用数据
反序列化可以使用单个schema(schema数据是用其编写的),但你可以指定不同的读取器schema。 读取器schema必须与数据序列化的schema兼容,但不必相同。 出于这个原因,我们引入了schema名称。 我们现在可以指定我们想要使用特定版本的schema读取数据。 在初始化时,我们读取每个schema名称所需的模式版本,并将元数据存储在readerSchemasByName中以便快速访问。 现在,我们可以读取使用兼容版本的schema编写的每条记录,就好像它是使用指定版本编写的一样。
[mw_shl_code=java,true]@Override
public void configure(Map<String, ?> configs, boolean isKey) {
  this.schemaProvider = SchemaUtils.getSchemaProvider(configs);
  this.readerSchemasByName = SchemaUtils.getVersionedSchemas(configs, schemaProvider);
}[/mw_shl_code]
当需要反序列化记录时,我们首先读取writer Schema的标识符。 这样可以按名称查找阅读器Schema。 有了这两种Schema,我们就可以创建一个GeneralDatumReader并读取记录。
[mw_shl_code=java,true]@Override
public GenericData.Record deserialize(String topic, byte[] data) {
  try (ByteArrayInputStream stream = new ByteArrayInputStream(data)) {

    int schemaId = readSchemaId(stream);
    VersionedSchema writerSchema = schemaProvider.get(schemaId);

    VersionedSchema readerSchema =
        readerSchemasByName.get(writerSchema.getName());
    GenericData.Record avroRecord = readAvroRecord(stream,
        writerSchema.getSchema(), readerSchema.getSchema());
    return avroRecord;
  } catch (IOException e) {
    throw new RuntimeException(e);
  }
}

private int readSchemaId(InputStream stream ) throws IOException {
  try(DataInputStream is = new DataInputStream(stream)) {
    return is.readInt();
  }
}

private GenericData.Record readAvroRecord(InputStream stream, Schema writerSchema, Schema readerSchema) throws IOException {
  DatumReader<Object> datumReader = new GenericDatumReader<>(writerSchema,
      readerSchema);
  BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(stream, null);
  GenericData.Record record = new GenericData.Record(readerSchema);
  datumReader.read(record, decoder);
  return record;
}[/mw_shl_code]

处理特定记录
通常,我们希望将一个类用于记录。 然后,此类通常从Avro架构生成。 Apache Avro提供了从模式生成Java代码的工具。 其中一个工具是Avro Maven插件。 生成的类具有从运行时可用的生成的模式。 这使序列化和反序列化更简单,更有效。 对于序列化,我们可以使用该类来查找要使用的模式标识符。
[mw_shl_code=java,true]@Override
public void configure(Map<String, ?> configs, boolean isKey) {
  String className = configs.get(isKey ? KEY_RECORD_CLASSNAME : VALUE_RECORD_CLASSNAME).toString();
  try (SchemaProvider schemaProvider = SchemaUtils.getSchemaProvider(configs)) {
    Class<?> recordClass = Class.forName(className);
    Schema writerSchema = new
        SpecificData(recordClass.getClassLoader()).getSchema(recordClass);
    this.writerSchemaId = schemaProvider.getMetadata(writerSchema).getId();
  } catch (Exception e) {
    throw new RuntimeException(e);
  }
}[/mw_shl_code]
因此,我们不需要逻辑来从主题和数据中确定schema。 我们使用记录类中可用的schema来编写记录。
类似地,对于反序列化,可以从类本身中找到读取器schema。 反序列化逻辑变得更简单,因为读取器schema在配置时是固定的,不需要通过模式名称查找。

[mw_shl_code=java,true]@Override
public T deserialize(String topic, byte[] data) {
  try (ByteArrayInputStream stream = new ByteArrayInputStream(data)) {
    int schemaId = readSchemaId(stream);
    VersionedSchema writerSchema = schemaProvider.get(schemaId);
    return readAvroRecord(stream, writerSchema.getSchema(), readerSchema);
  } catch (IOException e) {
    throw new RuntimeException(e);
  }
}

private T readAvroRecord(InputStream stream, Schema writerSchema, Schema readerSchema) throws IOException {
  DatumReader<T> datumReader = new SpecificDatumReader<>(writerSchema, readerSchema);
  BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(stream, null);
  return datumReader.read(null, decoder);
}[/mw_shl_code]




本帖被以下淘专辑推荐:

已有(3)人评论

跳转到指定楼层
jiangzi 发表于 2018-8-16 19:33:48
很好的工作实践~~~
回复

使用道具 举报

swinghu 发表于 2018-8-18 18:29:37
很好的工作实践~~~
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条