分享

新手求教kafka

evababy 发表于 2015-9-2 10:13:34 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 5 13779
刚接触kafka,版本0.8.2 测试过程中存在几个疑问,望解答。

1、创建一个topic后,用JAVA程序正常接受,如何设置消息的超时时间?比如10秒后(与重发3次的时间关系?)没有有接收的消息自动废弃。
2、当多个消费者使用不同的group.id订阅同一个主题后,哪里可以看到这个主题的订阅信息?想知道到底有那些group.id, 翻了一些控制台命令并无结果。
3、KAFKA只保证一个partitions内的消息顺序,实际场景中有办法保证多个partitions的消息顺序么?有些场景必须按照顺序接收,如果通过KEY制定partitions,那么只用一个partitions会否必然会影响系能?
4、配置文件consumer.properties  producer.properties作用?只是作为程序的默认配置么? 还能能做到多个kafka之间的订阅(支持类似多broker-1.xx写法么)?

非常感谢

已有(5)人评论

跳转到指定楼层
xuanxufeng 发表于 2015-9-2 11:22:27
retention.ms

log.retention.minutes
数据存储的最大时间超过这个时间会根据log.cleanup.policy设置的策略处理数据,也就是消费端能够多久去消费数据
log.retention.bytes和log.retention.minutes达到要求,都会执行删除,会被topic创建时的指定参数覆盖


来源:
apache kafka中topic级别配置
http://www.aboutyun.com/thread-10882-1-1.html



回复

使用道具 举报

arsenduan 发表于 2015-9-2 11:49:16
个人观点只供交流,如有错误,欢迎指正
2、当多个消费者使用不同的group.id订阅同一个主题后,哪里可以看到这个主题的订阅信息?想知道到底有那些group.id, 翻了一些控制台命令并无结果。

group_id自己定义的


3、KAFKA只保证一个partitions内的消息顺序,实际场景中有办法保证多个partitions的消息顺序么?有些场景必须按照顺序接收,如果通过KEY制定partitions,那么只用一个partitions会否必然会影响系能?
如果多个可能就产生问题了,目前还没有发现其他好的方法。不行,看看其他技术

4、配置文件consumer.properties  producer.properties作用?只是作为程序的默认配置么? 还能能做到多个kafka之间的订阅(支持类似多broker-1.xx写法么)?

这个是系统定义的。
producer.properties文件:此文件放在/resources目录下
配置如:
[mw_shl_code=bash,true] #partitioner.class=
    ##broker列表可以为kafka server的子集,因为producer需要从broker中获取metadata
    ##尽管每个broker都可以提供metadata,此处还是建议,将所有broker都列举出来
    ##此值,我们可以在spring中注入过来
    ##metadata.broker.list=127.0.0.1:9092,127.0.0.1:9093
    ##,127.0.0.1:9093
    ##同步,建议为async
    producer.type=sync
    compression.codec=0
    serializer.class=kafka.serializer.StringEncoder
    ##在producer.type=async时有效
    #batch.num.messages=100  [/mw_shl_code]


consumer.properties:文件位于/resources目录下
[mw_shl_code=bash,true] ## 此值可以配置,也可以通过spring注入
    ##zookeeper.connect=127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183
    ##,127.0.0.1:2182,127.0.0.1:2183
    # timeout in ms for connecting to zookeeper
    zookeeper.connectiontimeout.ms=1000000
    #consumer group id
    group.id=test-group
    #consumer timeout
    #consumer.timeout.ms=5000
    auto.commit.enable=true
    auto.commit.interval.ms=60000  [/mw_shl_code]

详细参考:

http://www.aboutyun.com/blog-1407-2725.html



回复

使用道具 举报

evababy 发表于 2015-9-2 12:15:32
本帖最后由 evababy 于 2015-9-2 17:09 编辑
xuanxufeng 发表于 2015-9-2 11:22
retention.ms

log.retention.minutes

非常感谢问题1已经解决

测试了两个参数retention.{ms,minutes,hours}、retention.bytes都可以达到想要的效果,如果被超时清理的消息在Consumer还可以接受到partitions数量的错误提示如下:
Current offset 420 for partition [sylvanas,2] out of range; reset offset to 440
……
说明topic sylvanas partitions 为2的从420移位到440,共计5条日志正好是移位数量等于测试发出的数据量

补充:
设置retention.{ms,minutes,hours}的同时也要考虑检查时间log.retention.check.interval.ms,如果检查时间大于超时时间,则造成超时时间=检查时间。
回复

使用道具 举报

evababy 发表于 2015-9-2 12:15:47
arsenduan 发表于 2015-9-2 11:49
个人观点只供交流,如有错误,欢迎指正
2、当多个消费者使用不同的group.id订阅同一个主题后,哪里可以看 ...

非常感谢,问题3、4明白了。

问题2:因为是测试过程,随便修改了很多个group.id进行测试,但最终不并清楚这个topic有那些定义的group,我是想通过某种手段获得有那些group,以便避免混淆
回复

使用道具 举报

evababy 发表于 2015-9-2 13:27:28
问题2:比较笨的版本可以变相查看group.id
通过登陆ZK
ls /XXX/consumers
可以获得整个KAFKA中有些group.id,再通过
ls /XXX/consumers/groupXX/offsets
得知这个group属于那个topic
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条