写了一个简单的kafka producer发送消息,consumer消费的例子
Producer程序如下:
[mw_shl_code=java,true]public class KafkaProducer {
public static void main(String[] args) throws Exception {
int events = 1000;
// 设置配置属性
Properties props = new Properties();
props.put("metadata.broker.list",
"localhost:9092");
props.put("serializer.class", "kafka.serializer.StringEncoder");
// props.put("partitioner.class",
// "com.catt.kafka.demo.PartitionerDemo");
// 触发acknowledgement机制,否则是fire and forget,可能会引起数据丢失
// 值为0,1,-1,可以参考
// http://kafka.apache.org/08/configuration.html
props.put("request.required.acks", "1");
ProducerConfig config = new ProducerConfig(props);
// 创建producer
Producer<String, String> producer = new Producer<String, String>(config);
// 产生并发送消息
for (int i = 0; i < events; i++) {
String ts = String.valueOf(System.currentTimeMillis());
String key1 = ""+i;
String msg1 = ts;
KeyedMessage<String, String> data = new KeyedMessage<String, String>(
"topic1", key1, msg1);
producer.send(data);
System.out.println(i+"\t"+msg1);
Thread.sleep(2000);
}
// 关闭producer
producer.close();
}
}[/mw_shl_code]
Consumer程序如下:
[mw_shl_code=java,true]public class KafkaConsumer {
public static void main(String[] args) {
// specify some consumer properties
Properties props = new Properties();
props.put("zookeeper.connect", "localhost:2181");
props.put("zookeeper.connectiontimeout.ms", "1000000");
props.put("group.id", "test_group");
// Create the connection to the cluster
ConsumerConfig consumerConfig = new ConsumerConfig(props);
ConsumerConnector connector = Consumer.createJavaConsumerConnector(consumerConfig);
//选择topic
Map<String,Integer> topics = new HashMap<String,Integer>();
// create 2 partitions of the stream for topic “test-topic”, to allow 2 threads to consume
topics.put("topic1", new Integer(1));
Map<String, List<KafkaStream<byte[], byte[]>>> topicMessageStreams = connector.createMessageStreams(topics);
List<KafkaStream<byte[], byte[]>> streams = topicMessageStreams.get("topic1");
//产生两个threads
ExecutorService threadPool = Executors.newFixedThreadPool(1);
//消费消息
for (final KafkaStream<byte[], byte[]> stream : streams) {
threadPool.submit(new Runnable() {
public void run() {
ConsumerIterator<byte[], byte[]> it = stream.iterator();
while (it.hasNext())
System.out.println("topic: " + new String(it.next().topic()) +
"\t key:" + new String(it.next().key()) +
"\t content:" + new String(it.next().message()));
}
});
}
}
}[/mw_shl_code]
在Producer中,消息的key是按照i递增的,打印出来如下图显示:
producer
但是在Consumer中接收到的消息全部是完整的:
consumer
求教为什么consumer都是隔着3个读取的?我在broker保存的文件中看到topic1-0的数据是完整的,说明发布成功了,但是为什么订阅不完整?
|
|