分享

【kafka运维】Topic的生产和发送运维脚本

问题导读

1.生产者如何生产无key消息?
2.如何实现持续批量推送消息?
3.如何实现持续批量拉取消息?



一.Topic的发送kafka-console-producer.sh
1.1 生产无key消息
  1. ## 生产者
  2. bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test --producer.config config/producer.properties
复制代码

1.2 生产有key消息

加上属性--property parse.key=true
  1. ## 生产者
  2. bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test --producer.config config/producer.properties  --property parse.key=true
复制代码

默认消息key与消息value间使用“Tab键”进行分隔,所以消息key以及value中切勿使用转义字符(\t)
可选参数

a1.png
a2.png
a3.png


二. Topic的消费kafka-console-consumer.sh
1. 新客户端从头消费--from-beginning (注意这里是新客户端,如果之前已经消费过了是不会从头消费的)
下面没有指定客户端名称,所以每次执行都是新客户端都会从头消费
  1. sh bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
复制代码

2. 正则表达式匹配topic进行消费--whitelist
消费所有的topic
  1. sh bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --whitelist ‘.*’
复制代码

消费所有的topic,并且还从头消费
  1. sh bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --whitelist ‘.*’ --from-beginning
复制代码

3.显示key进行消费--property print.key=true
  1. sh bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --property print.key=true
复制代码

4. 指定分区消费--partition 指定起始偏移量消费--offset
  1. sh bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --partition 0 --offset 100
复制代码

5. 给客户端命名--group
注意给客户端命名之后,如果之前有过消费,那么--from-beginning就不会再从头消费了
  1. sh bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --group test-group
复制代码

6. 添加客户端属性--consumer-property
这个参数也可以给客户端添加属性,但是注意 不能多个地方配置同一个属性,他们是互斥的;比如在下面的基础上还加上属性--group test-group 那肯定不行
  1. sh bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --consumer-property group.id=test-consumer-group
复制代码


7. 添加客户端属性--consumer.config
跟--consumer-property 一样的性质,都是添加客户端的属性,不过这里是指定一个文件,把属性写在文件里面, --consumer-property 的优先级大于 --consumer.config
  1. sh bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --consumer.config config/consumer.properties
复制代码

a4.png
a5.png
a6.png


三. 持续批量推送消息kafka-verifiable-producer.sh
单次发送100条消息--max-messages 100
一共要推送多少条,默认为-1,-1表示一直推送到进程关闭位置
  1. sh bin/kafka-verifiable-producer.sh --topic test_create_topic4 --bootstrap-server localhost:9092 --max-messages 100
复制代码

每秒发送最大吞吐量不超过消息 --throughput 100
推送消息时的吞吐量,单位messages/sec。默认为-1,表示没有限制
  1. sh bin/kafka-verifiable-producer.sh --topic test_create_topic4 --bootstrap-server localhost:9092 --throughput 100
复制代码

发送的消息体带前缀--value-prefix
  1. sh bin/kafka-verifiable-producer.sh --topic test_create_topic4 --bootstrap-server localhost:9092 --value-prefix 666
复制代码

注意--value-prefix 666必须是整数,发送的消息体的格式是加上一个 点号. 例如: 666.

其他参数:
--producer.config CONFIG_FILE 指定producer的配置文件
--acks ACKS 每次推送消息的ack值,默认是-1

四. 持续批量拉取消息kafka-verifiable-consumer
持续消费
  1. sh bin/kafka-verifiable-consumer.sh --group-id test_consumer --bootstrap-server localhost:9092 --topic test_create_topic4
复制代码

单次最大消费10条消息--max-messages 10
  1. sh bin/kafka-verifiable-consumer.sh --group-id test_consumer --bootstrap-server localhost:9092 --topic test_create_topic4 --max-messages 10
复制代码

相关可选参数

a7.png      



最新经典文章,欢迎关注公众号



已有(1)人评论

跳转到指定楼层
我的土豆不是梦8 发表于 2021-8-6 07:50:42
脚本命令因不同kafka版本而不同,楼主需指出版本号
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条