Joker 发表于 2016-9-17 18:28 这个是consumer的部分 public void RecieveMessage() { Map<String, Integer> topicCount = new HashMap<String, Integer>(); //定义订阅topic数量 topicCount.put(topic, new Integer(1)); //返回的是所有topic的Map Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams = consumer.createMessageStreams(topicCount); //取出我们要需要的topic中的消息流 List<KafkaStream<byte[], byte[]>> streams = consumerStreams.get(topic); //System.out.println("ready"); for (final KafkaStream<byte[], byte[]> stream : streams) { ConsumerIterator<byte[], byte[]> consumerIte = stream.iterator(); while (consumerIte.hasNext()) { String msg=new String(consumerIte.next().message()); this.rcvAmnout+=msg.length(); System.out.println("c:"+msg+" "+msg.length()); } } if (consumer != null) consumer.shutdown(); } |
程序贴出来看下 |
读取完毕,一般情况下,是不会删除数据的。到达一定时间后,kafka会自动删除。 如果强制删除也可以。 kafka删除topic方法 1) kafka-topics.sh --delete --zookeeper host:port --topic topicname 2) 删除kafka存储目录(server.properties文件log.dirs配置,默认为"/tmp/kafka-logs")相关topic目录删除zookeeper "/brokers/topics/"目录下相关topic节点 读取后,可以通过Java代码来删除。 |