kafka如何清理topic数据

查看数: 92710 | 评论数: 3 | 收藏 0
关灯 | 提示:支持键盘翻页<-左 右->
    组图打开中,请稍候......
发布时间: 2016-9-14 11:20

正文摘要:

标题可能不太明确,具体是这样的,自己模拟消息的发送过程,如图所示 不知道这个是和kafka的机制有关还是啥 笨方法就是给消息加上时间戳,但是如果有办法把topic的消息清空的话也许更方便一些

回复

alu1105 发表于 2016-9-17 19:52:25
Joker 发表于 2016-9-17 18:28
程序贴出来看下

这个是consumer的部分   

public void RecieveMessage() {
            Map<String, Integer> topicCount = new HashMap<String, Integer>();
            //定义订阅topic数量
            topicCount.put(topic, new Integer(1));
            //返回的是所有topic的Map
            Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams = consumer.createMessageStreams(topicCount);
            //取出我们要需要的topic中的消息流
            List<KafkaStream<byte[], byte[]>> streams = consumerStreams.get(topic);
            //System.out.println("ready");
            for (final KafkaStream<byte[], byte[]> stream : streams) {
                    ConsumerIterator<byte[], byte[]> consumerIte = stream.iterator();
                    while (consumerIte.hasNext())
                    {
                            String msg=new String(consumerIte.next().message());
                            this.rcvAmnout+=msg.length();
                        System.out.println("c:"+msg+"  "+msg.length());
                    }
            }
            if (consumer != null)
                    consumer.shutdown();
    }

Joker 发表于 2016-9-17 18:28:33
程序贴出来看下
arsenduan 发表于 2016-9-14 15:19:16


读取完毕,一般情况下,是不会删除数据的。到达一定时间后,kafka会自动删除。
如果强制删除也可以。

kafka删除topic方法
1) kafka-topics.sh --delete --zookeeper host:port --topic topicname
2) 删除kafka存储目录(server.properties文件log.dirs配置,默认为"/tmp/kafka-logs")相关topic目录删除zookeeper "/brokers/topics/"目录下相关topic节点


读取后,可以通过Java代码来删除。

关闭

推荐上一条 /2 下一条