分享

Flink实现自定义Avro序列化(Source/Sink)到kafka

问题导读:
1、Avro提供的技术支持包括哪些?
2、Avro优点有哪些?
3、如何使用Java自定义序列化到kafka?
4、Flink如何实现Avro自定义序列化到Kafka?


前言

最近一直在研究如果提高kafka中读取效率,之前一直使用字符串的方式将数据写入到kafka中。当数据将特别大的时候发现效率不是很好,偶然之间接触到了Avro序列化,发现kafka也是支持Avro的方式于是就有了本篇文章。

环境所依赖的pom文件

  1. <dependencies>
  2.         <dependency>
  3.             <groupId>org.apache.avro</groupId>
  4.             <artifactId>avro</artifactId>
  5.             <version>1.8.2</version>
  6.         </dependency>
  7.         <dependency>
  8.             <groupId>org.apache.flink</groupId>
  9.             <artifactId>flink-scala_2.12</artifactId>
  10.             <version>1.10.1</version>
  11.         </dependency>
  12.         <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala -->
  13.         <dependency>
  14.             <groupId>org.apache.flink</groupId>
  15.             <artifactId>flink-streaming-scala_2.12</artifactId>
  16.             <version>1.10.1</version>
  17.         </dependency>
  18.         <dependency>
  19.             <groupId>org.apache.flink</groupId>
  20.             <artifactId>flink-connector-kafka-0.11_2.12</artifactId>
  21.             <version>1.10.1</version>
  22.         </dependency>
  23.         <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-avro -->
  24.         <dependency>
  25.             <groupId>org.apache.flink</groupId>
  26.             <artifactId>flink-avro</artifactId>
  27.             <version>1.10.1</version>
  28.         </dependency>
  29.         <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
  30.         <dependency>
  31.             <groupId>org.apache.kafka</groupId>
  32.             <artifactId>kafka-clients</artifactId>
  33.             <version>1.0.0</version>
  34.         </dependency>
  35.         <dependency>
  36.             <groupId>org.apache.kafka</groupId>
  37.             <artifactId>kafka-streams</artifactId>
  38.             <version>1.0.0</version>
  39.         </dependency>
  40.     </dependencies>
  41.     <build>
  42.         <plugins>
  43.             <plugin>
  44.                 <groupId>org.apache.avro</groupId>
  45.                 <artifactId>avro-maven-plugin</artifactId>
  46.                 <version>1.8.2</version>
  47.                 <executions>
  48.                     <execution>
  49.                         <phase>generate-sources</phase>
  50.                         <goals>
  51.                             <goal>schema</goal>
  52.                         </goals>
  53.                         <configuration>
  54.                             <sourceDirectory>${project.basedir}/src/main/avro/</sourceDirectory>
  55.                             <outputDirectory>${project.basedir}/src/main/java/</outputDirectory>
  56.                         </configuration>
  57.                     </execution>
  58.                 </executions>
  59.             </plugin>
  60.             <plugin>
  61.                 <groupId>org.apache.maven.plugins</groupId>
  62.                 <artifactId>maven-compiler-plugin</artifactId>
  63.                 <configuration>
  64.                     <source>1.6</source>
  65.                     <target>1.6</target>
  66.                 </configuration>
  67.             </plugin>
  68.         </plugins>
  69.     </build>
复制代码


一、Avro提供的技术支持包括以下五个方面:

  •     优秀的数据结构;
  •     一个紧凑的,快速的,二进制数据格式;
  •     一个容器文件,用来存储持久化数据;
  •     RPC远程过程调用;
  •     集成最简单的动态语言。读取或者写入数据文件,使用或实现RPC协议均不需要代码实现。对于静态- - 语言编写的话需要实现;

二、Avro优点

  •     二进制消息,性能好/效率高
  •     使用JSON描述模式
  •     模式和数据统一存储,消息自描述,不需要生成stub代码(支持生成IDL)
  •     RPC调用在握手阶段交换模式定义
  •     包含完整的客户端/服务端堆栈,可快速实现RPC
  •     支持同步和异步通信
  •     支持动态消息
  •     模式定义允许定义数据的排序(序列化时会遵循这个顺序)
  •     提供了基于Jetty内核的服务基于Netty的服务

三、Avro Json格式介绍

  1. {
  2.     "namespace": "com.avro.bean",
  3.     "type": "record",
  4.     "name": "UserBehavior",
  5.     "fields": [
  6.         {"name": "userId", "type": "long"},
  7.         {"name": "itemId",  "type": "long"},
  8.         {"name": "categoryId", "type": "int"},
  9.         {"name": "behavior", "type": "string"},
  10.         {"name": "timestamp", "type": "long"}
  11.     ]
  12. }
复制代码

  •     namespace : 要生成的目录
  •     type : 类型 avro 使用 record
  •     name : 会自动生成对应的对象
  •     fields : 要指定的字段

注意: 创建的文件后缀名一定要叫 avsc

我们使用idea 生成 UserBehavior 对象
20210108195633291.gif

四、使用Java自定义序列化到kafka

         首先我们先使用 Java编写Kafka客户端写入数据和消费数据。

4.1 准备测试数据

  1.     543462,1715,1464116,pv,1511658000
  2.     662867,2244074,1575622,pv,1511658000
  3.     561558,3611281,965809,pv,1511658000
  4.     894923,3076029,1879194,pv,1511658000
  5.     834377,4541270,3738615,pv,1511658000
  6.     315321,942195,4339722,pv,1511658000
  7.     625915,1162383,570735,pv,1511658000
复制代码



4.2 自定义Avro 序列化和反序列化

首先我们需要实现2个类分别为Serializer和Deserializer分别是序列化和反序列化

  1. package com.avro.AvroUtil;
  2. import com.avro.bean.UserBehavior;
  3. import org.apache.avro.io.BinaryDecoder;
  4. import org.apache.avro.io.BinaryEncoder;
  5. import org.apache.avro.io.DecoderFactory;
  6. import org.apache.avro.io.EncoderFactory;
  7. import org.apache.avro.specific.SpecificDatumReader;
  8. import org.apache.avro.specific.SpecificDatumWriter;
  9. import org.apache.kafka.common.serialization.Deserializer;
  10. import org.apache.kafka.common.serialization.Serializer;
  11. import java.io.ByteArrayInputStream;
  12. import java.io.ByteArrayOutputStream;
  13. import java.io.IOException;
  14. import java.util.Map;
  15. /**
  16. * @author 大数据老哥
  17. * @version V1.0
  18. * @Package com.avro.AvroUtil
  19. * @File :SimpleAvroSchemaJava.java
  20. * @date 2021/1/8 20:02
  21. */
  22. /**
  23. *  自定义序列化和反序列化
  24. */
  25. public class SimpleAvroSchemaJava implements Serializer<UserBehavior>, Deserializer<UserBehavior> {
  26.    
  27.     @Override
  28.     public void configure(Map<String, ?> map, boolean b) {
  29.     }
  30.     //序列化方法
  31.     @Override
  32.     public byte[] serialize(String s, UserBehavior userBehavior) {
  33.         // 创建序列化执行器
  34.         SpecificDatumWriter<UserBehavior> writer = new SpecificDatumWriter<UserBehavior>(userBehavior.getSchema());
  35.          // 创建一个流 用存储序列化后的二进制文件
  36.         ByteArrayOutputStream out = new ByteArrayOutputStream();
  37.         // 创建二进制编码器
  38.         BinaryEncoder encoder = EncoderFactory.get().directBinaryEncoder(out, null);
  39.         try {
  40.             // 数据入都流中
  41.             writer.write(userBehavior, encoder);
  42.         } catch (IOException e) {
  43.             e.printStackTrace();
  44.         }
  45.         return out.toByteArray();
  46.     }
  47.     @Override
  48.     public void close() {
  49.     }
  50.     //反序列化
  51.     @Override
  52.     public UserBehavior deserialize(String s, byte[] bytes) {
  53.         // 用来保存结果数据
  54.         UserBehavior userBehavior = new UserBehavior();
  55.         // 创建输入流用来读取二进制文件
  56.         ByteArrayInputStream arrayInputStream = new ByteArrayInputStream(bytes);
  57.         // 创建输入序列化执行器
  58.         SpecificDatumReader<UserBehavior> stockSpecificDatumReader = new SpecificDatumReader<UserBehavior>(userBehavior.getSchema());
  59.         //创建二进制解码器
  60.         BinaryDecoder binaryDecoder = DecoderFactory.get().directBinaryDecoder(arrayInputStream, null);
  61.         try {
  62.             // 数据读取
  63.             userBehavior=stockSpecificDatumReader.read(null, binaryDecoder);
  64.         } catch (IOException e) {
  65.             e.printStackTrace();
  66.         }
  67.         // 结果返回
  68.         return userBehavior;
  69.     }
  70. }
复制代码

4.3 创建序列化对象

  1. package com.avro.kafka;
  2. import com.avro.bean.UserBehavior;
  3. import org.apache.kafka.clients.producer.KafkaProducer;
  4. import org.apache.kafka.clients.producer.ProducerRecord;
  5. import java.io.BufferedReader;
  6. import java.io.FileReader;
  7. import java.util.ArrayList;
  8. import java.util.List;
  9. import java.util.Properties;
  10. /**
  11. * @author 大数据老哥
  12. * @version V1.0
  13. * @Package com.avro.kafka
  14. * @File :UserBehaviorProducerKafka.java
  15. * @date 2021/1/8 20:14
  16. */
  17. public class UserBehaviorProducerKafka {
  18.     public static void main(String[] args) throws InterruptedException {
  19.         // 获取数据
  20.         List<UserBehavior> data = getData();
  21.         // 创建配置文件
  22.         Properties props = new Properties();
  23.         props.setProperty("bootstrap.servers", "192.168.100.201:9092,192.168.100.202:9092,192.168.100.203:9092");
  24.         props.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  25.         props.setProperty("value.serializer", "com.avro.AvroUtil.SimpleAvroSchemaJava");
  26.         // 创建kafka的生产者
  27.         KafkaProducer<String, UserBehavior> userBehaviorProducer = new KafkaProducer<String, UserBehavior>(props);
  28.         // 循环遍历数据
  29.         for (UserBehavior userBehavior : data) {
  30.             ProducerRecord<String, UserBehavior> producerRecord = new ProducerRecord<String, UserBehavior>("UserBehaviorKafka", userBehavior);
  31.             userBehaviorProducer.send(producerRecord);
  32.             System.out.println("数据写入成功"+data);
  33.             Thread.sleep(1000);
  34.         }
  35.     }
  36.     public static List<UserBehavior> getData() {
  37.         ArrayList<UserBehavior> userBehaviors = new ArrayList<UserBehavior>();
  38.         try {
  39.             BufferedReader br = new BufferedReader(new FileReader(new File("data/UserBehavior.csv")));
  40.             String line = "";
  41.             while ((line = br.readLine()) != null) {
  42.                 String[] split = line.split(",");
  43.              userBehaviors.add( new UserBehavior(Long.parseLong(split[0]), Long.parseLong(split[1]), Integer.parseInt(split[2]), split[3], Long.parseLong(split[4])));
  44.             }
  45.         } catch (Exception e) {
  46.             e.printStackTrace();
  47.         }
  48.         return userBehaviors;
  49.     }
  50. }
复制代码


注意:value.serializer 一定要指定我们自己写好的那个反序列化类,负责会无效

4.4 创建反序列化对象

  1. package com.avro.kafka;
  2. import com.avro.bean.UserBehavior;
  3. import org.apache.kafka.clients.consumer.ConsumerRecord;
  4. import org.apache.kafka.clients.consumer.ConsumerRecords;
  5. import org.apache.kafka.clients.consumer.KafkaConsumer;
  6. import java.util.Arrays;
  7. import java.util.Properties;
  8. /**
  9. * @author 大数据老哥
  10. * @version V1.0
  11. * @Package com.avro.kafka
  12. * @File :UserBehaviorConsumer.java
  13. * @date 2021/1/8 20:58
  14. */
  15. public class UserBehaviorConsumer {
  16.     public static void main(String[] args) {
  17.         Properties prop = new Properties();
  18.         prop.put("bootstrap.servers", "192.168.100.201:9092,192.168.100.202:9092,192.168.100.203:9092");
  19.         prop.put("group.id", "UserBehavior");
  20.         prop.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  21.         // 设置反序列化类为自定义的avro反序列化类
  22.         prop.put("value.deserializer", "com.avro.AvroUtil.SimpleAvroSchemaJava");
  23.         KafkaConsumer<String, UserBehavior> consumer = new KafkaConsumer<String, UserBehavior>(prop);
  24.         consumer.subscribe(Arrays.asList("UserBehaviorKafka"));
  25.         while (true) {
  26.             ConsumerRecords<String, UserBehavior> poll = consumer.poll(1000);
  27.             for (ConsumerRecord<String, UserBehavior> stringStockConsumerRecord : poll) {
  28.                 System.out.println(stringStockConsumerRecord.value());
  29.             }
  30.         }
  31.     }
  32. }
复制代码

4.5 启动运行

创建kafkaTopic 和启动一个消费者
  1. <div># 创建topic
  2. ./kafka-topics.sh --create --zookeeper node01:2181,node02:2181,node03:2181 --replication-factor 2 --partitions 3 --topic UserBehaviorKafka
  3. </div><div>
  4. </div><div># 模拟消费者
  5. ./kafka-console-consumer.sh --from-beginning --topic UserBehaviorKafka --zookeeper node01:2181,node02:2node03:2181</div>
复制代码


2021-08-10_173837.jpg

五、Flink 实现Avro自定义序列化到Kafka

到这里好多小伙们就说我Java实现了那Flink 不就改一下Consumer 和Producer 不就完了吗?

5.1 准备数据

  1.     543462,1715,1464116,pv,1511658000
  2.     662867,2244074,1575622,pv,1511658000
  3.     561558,3611281,965809,pv,1511658000
  4.     894923,3076029,1879194,pv,1511658000
  5.     834377,4541270,3738615,pv,1511658000
  6.     315321,942195,4339722,pv,1511658000
  7.     625915,1162383,570735,pv,1511658000
复制代码



5.2 创建Flink自定义Avro序列化和反序列化

当我们创建FlinkKafka连接器的时候发现使用Java那个类序列化发现不行,于是我们改为了系统自带的那个类进行测试。点击源码查看发系统自带的那个String其实实现的是DeserializationSchema和SerializationSchema,那我们是不是也可以模仿一个那?

2021-08-10_174027.jpg

  1. <div>package com.avro.AvroUtil;
  2. import com.avro.bean.UserBehavior;
  3. import com.typesafe.sslconfig.ssl.FakeChainedKeyStore;
  4. import org.apache.avro.io.BinaryDecoder;
  5. import org.apache.avro.io.BinaryEncoder;
  6. import org.apache.avro.io.DecoderFactory;
  7. import org.apache.avro.io.EncoderFactory;
  8. import org.apache.avro.specific.SpecificDatumReader;
  9. import org.apache.avro.specific.SpecificDatumWriter;
  10. import org.apache.flink.api.common.serialization.DeserializationSchema;
  11. import org.apache.flink.api.common.serialization.SerializationSchema;
  12. import org.apache.flink.api.common.typeinfo.TypeInformation;
  13. import org.apache.kafka.common.serialization.Deserializer;
  14. import org.apache.kafka.common.serialization.Serializer;
  15. import java.io.ByteArrayInputStream;
  16. import java.io.ByteArrayOutputStream;
  17. import java.io.IOException;
  18. import java.util.Map;
  19. /**
  20. * @author 大数据老哥
  21. * @version V1.0
  22. * @Package com.avro.AvroUtil
  23. * @File :SimpleAvroSchemaFlink.java
  24. * @date 2021/1/8 20:02
  25. */
  26. /**
  27. *  自定义序列化和反序列化
  28. */
  29. public class SimpleAvroSchemaFlink implements DeserializationSchema<UserBehavior>, SerializationSchema<UserBehavior> {
  30.     @Override
  31.     public byte[] serialize(UserBehavior userBehavior) {
  32.         // 创建序列化执行器
  33.         SpecificDatumWriter<UserBehavior> writer = new SpecificDatumWriter<UserBehavior>(userBehavior.getSchema());
  34.         // 创建一个流 用存储序列化后的二进制文件
  35.         ByteArrayOutputStream out = new ByteArrayOutputStream();
  36.         // 创建二进制编码器
  37.         BinaryEncoder encoder = EncoderFactory.get().directBinaryEncoder(out, null);
  38.         try {
  39.             // 数据入都流中
  40.             writer.write(userBehavior, encoder);
  41.         } catch (IOException e) {
  42.             e.printStackTrace();
  43.         }
  44.         return out.toByteArray();
  45.     }
  46.     @Override
  47.     public TypeInformation<UserBehavior> getProducedType() {
  48.       return TypeInformation.of(UserBehavior.class);
  49.     }
  50.     @Override
  51.     public UserBehavior deserialize(byte[] bytes) throws IOException {
  52.         // 用来保存结果数据
  53.         UserBehavior userBehavior = new UserBehavior();
  54.         // 创建输入流用来读取二进制文件
  55.         ByteArrayInputStream arrayInputStream = new ByteArrayInputStream(bytes);
  56.         // 创建输入序列化执行器
  57.         SpecificDatumReader<UserBehavior> stockSpecificDatumReader = new SpecificDatumReader<UserBehavior>(userBehavior.getSchema());
  58.         //创建二进制解码器
  59.         BinaryDecoder binaryDecoder = DecoderFactory.get().directBinaryDecoder(arrayInputStream, null);
  60.         try {
  61.             // 数据读取
  62.             userBehavior=stockSpecificDatumReader.read(null, binaryDecoder);
  63.         } catch (IOException e) {
  64.             e.printStackTrace();
  65.         }
  66.         // 结果返回
  67.         return userBehavior;
  68.     }
  69.     @Override
  70.     public boolean isEndOfStream(UserBehavior userBehavior) {
  71.         return false;
  72.     }
  73. }
  74. </div>
复制代码



5.3 创建Flink Comsumer 反序列化

  1. package com.avro.FlinkKafka
  2. import com.avro.AvroUtil.{SimpleAvroSchemaFlink}
  3. import com.avro.bean.UserBehavior
  4. import org.apache.flink.streaming.api.scala._
  5. import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011
  6. import java.util.Properties
  7. /**
  8. * @Package com.avro.FlinkKafka
  9. * @File :UserBehaviorConsumerFlink.java
  10. * @author 大数据老哥
  11. * @date 2021/1/8 21:18
  12. * @version V1.0
  13. */
  14. object UserBehaviorConsumerFlink {
  15.   def main(args: Array[String]): Unit = {
  16.     //1.构建流处理运行环境
  17.     val env = StreamExecutionEnvironment.getExecutionEnvironment
  18.     env.setParallelism(1) // 设置并行度1 方便后面测试
  19.     // 2.设置kafka 配置信息
  20.     val prop = new Properties
  21.     prop.put("bootstrap.servers", "192.168.100.201:9092,192.168.100.202:9092,192.168.100.203:9092")
  22.     prop.put("group.id", "UserBehavior")
  23.     prop.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
  24.     // 设置反序列化类为自定义的avro反序列化类
  25.     prop.put("value.deserializer", "com.avro.AvroUtil.SimpleAvroSchemaFlink")
  26.     //    val kafka: FlinkKafkaConsumer011[String] =  new FlinkKafkaConsumer011[String]("UserBehaviorKafka", new SimpleStringSchema(), prop)
  27.     // 3.构建Kafka 连接器
  28.     val kafka: FlinkKafkaConsumer011[UserBehavior] = new FlinkKafkaConsumer011[UserBehavior]("UserBehavior", new SimpleAvroSchemaFlink(), prop)
  29.     //4.设置Flink层最新的数据开始消费
  30.     kafka.setStartFromLatest()
  31.     //5.基于kafka构建数据源
  32.     val data: DataStream[UserBehavior] = env.addSource(kafka)
  33.     //6.结果打印
  34.     data.print()
  35.     env.execute("UserBehaviorConsumerFlink")
  36.   }
  37. }
复制代码


5.4 创建Flink Producer 序列化

  1. package com.avro.FlinkKafka
  2. import com.avro.AvroUtil.SimpleAvroSchemaFlink
  3. import com.avro.bean.UserBehavior
  4. import org.apache.flink.streaming.api.scala._
  5. import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011
  6. import java.util.Properties
  7. /**
  8. * @Package com.avro.FlinkKafka
  9. * @File :UserBehaviorProducerFlink.java
  10. * @author 大数据老哥
  11. * @date 2021/1/8 21:38
  12. * @version V1.0
  13. */
  14. object UserBehaviorProducerFlink {
  15.   def main(args: Array[String]): Unit = {
  16.     val env = StreamExecutionEnvironment.getExecutionEnvironment
  17.     val value = env.readTextFile("./data/UserBehavior.csv")
  18.     val users: DataStream[UserBehavior] = value.map(row => {
  19.       val arr = row.split(",")
  20.       val behavior = new UserBehavior()
  21.       behavior.setUserId(arr(0).toLong)
  22.       behavior.setItemId(arr(1).toLong)
  23.       behavior.setCategoryId(arr(2).toInt)
  24.       behavior.setBehavior(arr(3))
  25.       behavior.setTimestamp(arr(4).toLong)
  26.       behavior
  27.     })
  28.     val prop = new Properties()
  29.     prop.setProperty("bootstrap.servers", "node01:9092,node02:9092,node03:9092")
  30.     //4.连接Kafka
  31.     val producer: FlinkKafkaProducer011[UserBehavior] = new FlinkKafkaProducer011[UserBehavior]("UserBehaviorKafka", new SimpleAvroSchemaFlink(), prop)
  32.     //5.将数据打入kafka
  33.     users.addSink(producer)
  34.     //6.执行任务
  35.     env.execute("UserBehaviorProducerFlink")
  36.   }
  37. }
复制代码

5.5 启动运行
2021-08-10_174149.jpg


需要源码的请去GitHub 自行下载 https://github.com/lhh2002/Flink_Avro

小结


其实我在实现这个功能的时候也是蒙的,不会难道就不学了吗,肯定不是呀。我在5.2提出的那个问题的时候其实是我自己亲身经历过的。首先遇到了问题不要想着怎么放弃,而是想想怎么解决,当时我的思路看源码看别人写的。最后经过不懈的努力也终成功了,我在这里为大家提供Flink面试题需要的朋友可以去下面GitHub去下载,信自己,努力和汗水总会能得到回报的。我是大数据老哥,我们下期见~~~

作者:大数据老哥
来源:https://blog.csdn.net/qq_43791724/article/details/112371170

最新经典文章,欢迎关注公众号



没找到任何评论,期待你打破沉寂

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条