本帖最后由 丫丫 于 2016-10-14 18:53 编辑
问题导读
1、搭建kafka伪集群时,如何修改配置参数?
2、如何启动kafka?
3、如何构建卡夫卡集群?
4、如何编写producer?
5、如何编写consumer?
在搭建kafka 我是花费了一些时间,其实这个并不难,只是我想使用Docker 搭建遇到了一些宿主机荷容器通信问题,所以耽误了一段时间。下面我分享的是本地搭建Kafka伪集群。主要分为以下几步。
第一步,下载zookeeper 和kafka
第二步,解压并修改配置参数
第三步,构建kafka集群
第四步,编写Java代码
第一步,下载zookeeper 和kafka
第二步,解压并修改配置参数
解压下载的kafka 压缩包(如kafka_2.10-0.10.0.1.tgz)
解压命令
$ tar -zxvf kafka_2.10-0.10.0.1.tgz
解压后先启动zookeeper 然后在启动kafka,zookeeper的启动这里不做描述。
启动kafka
进入解压后的kafka文件路径下
(如果zookeeper和kafka是在同一台宿主机上则启动zookeeper后可直接启动kafka 命令如下,如果不在同一宿主机上则kafka需要修改config目录下server.properties的zookeeper.connect=localhost:2181 配置项,修改为zookeeper的地址 )
bin/kafka-server-start.sh config/server.properties
停止kafka 可直接使用control +C 或者执行
bin/kafka-server-stop.sh
kafka启动后我门可以新建一个topic (topic的名字为testyang,–replication-factor表示复制到多少个节点,–partitions表示分区数,一般都设置为2或与节点数相等,不能大于总节点数)
查看topic
创建一个producer 发送数据
创建一个consumer 消费数据
单机版已经可以正常运行使用了(注意此处zookeeper的地址,和kafka的地址,图中我的zookeeper 在宿主机中,kafka在docker 容器中。读者编写时,应以自己的实际地址为准)
第三步,构建kafka集群
(构建集群时我的kafka和zookeeper都在本机)
构建集群其实就是把server.properties 文件复制多份
server-1.properties
server-2.properties
修改其中部分参数,此处只列出修改的参数(由于是伪集群所以host.name相同,都为本机地址)
config/server.properties:中新增
[mw_shl_code=text,true]port=9092
host.name=192.168.0.101[/mw_shl_code] config/server-1.properties: [mw_shl_code=text,true]broker.id=1
port=9093
log.dir=/tmp/kafka-logs-1
host.name=192.168.0.101[/mw_shl_code] config/server-2.properties: [mw_shl_code=text,true]broker.id=2
port=9094
log.dir=/tmp/kafka-logs-2
host.name=192.168.0.101[/mw_shl_code]
注意:
真正集群要设置host.name和advertised.host.name这两个属性(博主感觉只要host.name就行了,没上业务,不好评论)host.name 一定要配成真实IP 如 192.168.0.101
然后打开三个终端分别启动三个broker(也可以在命令最后加&符号,让其在后台运行) [mw_shl_code=text,true]bin/kafka-server-start.sh config/server.properties
bin/kafka-server-start.sh config/server-1.properties
bin/kafka-server-start.sh config/server-2.properties[/mw_shl_code]
启动成功后创建一个topic (设置3个partition)
然后查看topic描述 在文章结尾附上两份kafka的参数配置详解
第四步,编写java代码
伪集群构建成功且创建topic好了以后,就可以编写java 客户端代码了。
创建一个maven工程加入pom依赖 [mw_shl_code=xml,true]<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.10.0.0</version>
</dependency>
<dependency>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
<version>1.1.1</version>
</dependency>[/mw_shl_code]
编写producer
[mw_shl_code=java,true]package com.us.kafka;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import kafka.serializer.StringEncoder;
public class KafkaProducer extends Thread{
private String topic;
public KafkaProducer(String topic){
super();
this.topic = topic;
}
@Override
public void run() {
Producer producer = createProducer();
int i=0;
while(true){
producer.send(new KeyedMessage<Integer, String>(topic, "message: " + i++));
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
private Producer createProducer() {
Properties properties = new Properties();
properties.put("zookeeper.connect", "192.168.0.101:2181");//声明zk
properties.put("serializer.class", StringEncoder.class.getName());
properties.put("metadata.broker.list", "192.168.0.101:9092,192.168.0.101:9093,192.168.0.101:9094");// 声明kafka broker ,要注意地址一定要正确
return new Producer<Integer, String>(new ProducerConfig(properties));
}
public static void main(String[] args) {
new KafkaProducer("test").start();// 使用kafka集群中创建好的主题 test
}
} [/mw_shl_code]
编写consumer
[mw_shl_code=java,true]package com.us.kafka;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
public class kafkaConsumer extends Thread{
private String topic;
public kafkaConsumer(String topic){
super();
this.topic = topic;
}
@Override
public void run() {
ConsumerConnector consumer = createConsumer();
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, 1); // 一次从主题中获取一个数据
Map<String, List<KafkaStream<byte[], byte[]>>> messageStreams = consumer.createMessageStreams(topicCountMap);
KafkaStream<byte[], byte[]> stream = messageStreams.get(topic).get(0);// 获取每次接收到的这个数据
ConsumerIterator<byte[], byte[]> iterator = stream.iterator();
while(iterator.hasNext()){
String message = new String(iterator.next().message());
System.out.println("接收到: " + message);
}
}
private ConsumerConnector createConsumer() {
Properties properties = new Properties();
properties.put("zookeeper.connect", "localhost:2181");//声明zk
properties.put("group.id", "group1");// 必须要使用别的组名称, 如果生产者和消费者都在同一组,则不能访问同一组内的topic数据
properties.put("zookeeper.session.timeout.ms", "40000");
properties.put("zookeeper.sync.time.ms", "200");
properties.put("auto.commit.interval.ms", "1000");
return Consumer.createJavaConsumerConnector(new ConsumerConfig(properties));
}
public static void main(String[] args) {
new kafkaConsumer("test").start();// 使用kafka集群中创建好的主题 test
}
} [/mw_shl_code]
启动produce 如图示
启动consumer
如图示
如果运行失败请注意错误信息,多半是因为地址配置错误,导致链接不上。如Fetching topic metadata with correlation id 0 for topics [Set(testyang)] from broker [BrokerEndPoint(0,192.168.65.2,9093)] failed
java.nio.channels.ClosedChannelException 为找不到broker ,要注意broker的地址是否正确,能否ping 通
作者:双斜杠少年 来源:csdn
|