问题导读
1.生产者如何生产无key消息?
2.如何实现持续批量推送消息?
3.如何实现持续批量拉取消息?
一.Topic的发送kafka-console-producer.sh
1.1 生产无key消息
- ## 生产者
- bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test --producer.config config/producer.properties
复制代码
1.2 生产有key消息
加上属性--property parse.key=true
- ## 生产者
- bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test --producer.config config/producer.properties --property parse.key=true
复制代码
默认消息key与消息value间使用“Tab键”进行分隔,所以消息key以及value中切勿使用转义字符(\t)
可选参数
二. Topic的消费kafka-console-consumer.sh
1. 新客户端从头消费--from-beginning (注意这里是新客户端,如果之前已经消费过了是不会从头消费的)
下面没有指定客户端名称,所以每次执行都是新客户端都会从头消费
- sh bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
复制代码
2. 正则表达式匹配topic进行消费--whitelist
消费所有的topic
- sh bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --whitelist ‘.*’
复制代码
消费所有的topic,并且还从头消费
- sh bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --whitelist ‘.*’ --from-beginning
复制代码
3.显示key进行消费--property print.key=true
- sh bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --property print.key=true
复制代码
4. 指定分区消费--partition 指定起始偏移量消费--offset
- sh bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --partition 0 --offset 100
复制代码
5. 给客户端命名--group
注意给客户端命名之后,如果之前有过消费,那么--from-beginning就不会再从头消费了
- sh bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --group test-group
复制代码
6. 添加客户端属性--consumer-property
这个参数也可以给客户端添加属性,但是注意 不能多个地方配置同一个属性,他们是互斥的;比如在下面的基础上还加上属性--group test-group 那肯定不行
- sh bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --consumer-property group.id=test-consumer-group
复制代码
7. 添加客户端属性--consumer.config
跟--consumer-property 一样的性质,都是添加客户端的属性,不过这里是指定一个文件,把属性写在文件里面, --consumer-property 的优先级大于 --consumer.config
- sh bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --consumer.config config/consumer.properties
复制代码
三. 持续批量推送消息kafka-verifiable-producer.sh
单次发送100条消息--max-messages 100
一共要推送多少条,默认为-1,-1表示一直推送到进程关闭位置
- sh bin/kafka-verifiable-producer.sh --topic test_create_topic4 --bootstrap-server localhost:9092 --max-messages 100
复制代码
每秒发送最大吞吐量不超过消息 --throughput 100
推送消息时的吞吐量,单位messages/sec。默认为-1,表示没有限制
- sh bin/kafka-verifiable-producer.sh --topic test_create_topic4 --bootstrap-server localhost:9092 --throughput 100
复制代码
发送的消息体带前缀--value-prefix
- sh bin/kafka-verifiable-producer.sh --topic test_create_topic4 --bootstrap-server localhost:9092 --value-prefix 666
复制代码
注意--value-prefix 666必须是整数,发送的消息体的格式是加上一个 点号. 例如: 666.
其他参数:
--producer.config CONFIG_FILE 指定producer的配置文件
--acks ACKS 每次推送消息的ack值,默认是-1
四. 持续批量拉取消息kafka-verifiable-consumer
持续消费
- sh bin/kafka-verifiable-consumer.sh --group-id test_consumer --bootstrap-server localhost:9092 --topic test_create_topic4
复制代码
单次最大消费10条消息--max-messages 10
- sh bin/kafka-verifiable-consumer.sh --group-id test_consumer --bootstrap-server localhost:9092 --topic test_create_topic4 --max-messages 10
复制代码
相关可选参数
|