分享

kafka如何清理topic数据

alu1105 发表于 2016-9-14 11:20:02 [显示全部楼层] 只看大图 回帖奖励 阅读模式 关闭右栏 3 92768
标题可能不太明确,具体是这样的,自己模拟消息的发送过程,如图所示
1.png
假如运行到此时程序停止,当再次运行时,consumer会再次收到最后一条消息:Pk5sd......
2.png
不知道这个是和kafka的机制有关还是啥
笨方法就是给消息加上时间戳,但是如果有办法把topic的消息清空的话也许更方便一些

已有(3)人评论

跳转到指定楼层
arsenduan 发表于 2016-9-14 15:19:16


读取完毕,一般情况下,是不会删除数据的。到达一定时间后,kafka会自动删除。
如果强制删除也可以。

kafka删除topic方法
1) kafka-topics.sh --delete --zookeeper host:port --topic topicname
2) 删除kafka存储目录(server.properties文件log.dirs配置,默认为"/tmp/kafka-logs")相关topic目录删除zookeeper "/brokers/topics/"目录下相关topic节点


读取后,可以通过Java代码来删除。

回复

使用道具 举报

Joker 发表于 2016-9-17 18:28:33
程序贴出来看下
回复

使用道具 举报

alu1105 发表于 2016-9-17 19:52:25
Joker 发表于 2016-9-17 18:28
程序贴出来看下

这个是consumer的部分   

public void RecieveMessage() {
            Map<String, Integer> topicCount = new HashMap<String, Integer>();
            //定义订阅topic数量
            topicCount.put(topic, new Integer(1));
            //返回的是所有topic的Map
            Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams = consumer.createMessageStreams(topicCount);
            //取出我们要需要的topic中的消息流
            List<KafkaStream<byte[], byte[]>> streams = consumerStreams.get(topic);
            //System.out.println("ready");
            for (final KafkaStream<byte[], byte[]> stream : streams) {
                    ConsumerIterator<byte[], byte[]> consumerIte = stream.iterator();
                    while (consumerIte.hasNext())
                    {
                            String msg=new String(consumerIte.next().message());
                            this.rcvAmnout+=msg.length();
                        System.out.println("c:"+msg+"  "+msg.length());
                    }
            }
            if (consumer != null)
                    consumer.shutdown();
    }

回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条