分享

kafka开发步骤:集群搭建及功能实现

本帖最后由 丫丫 于 2016-10-14 18:53 编辑

问题导读

1、搭建kafka伪集群时,如何修改配置参数?
2、如何启动kafka?
3、如何构建卡夫卡集群?
4、如何编写producer?
5、如何编写consumer?









在搭建kafka 我是花费了一些时间,其实这个并不难,只是我想使用Docker 搭建遇到了一些宿主机荷容器通信问题,所以耽误了一段时间。下面我分享的是本地搭建Kafka伪集群。主要分为以下几步。
第一步,下载zookeeper 和kafka
第二步,解压并修改配置参数
第三步,构建kafka集群
第四步,编写Java代码



第一步,下载zookeeper 和kafka

zookeeper 的下砸和安装请参考zookeeper的安装


第二步,解压并修改配置参数

解压下载的kafka 压缩包(如kafka_2.10-0.10.0.1.tgz)

解压命令

$ tar -zxvf kafka_2.10-0.10.0.1.tgz
解压后先启动zookeeper 然后在启动kafka,zookeeper的启动这里不做描述。

启动kafka


进入解压后的kafka文件路径下
(如果zookeeper和kafka是在同一台宿主机上则启动zookeeper后可直接启动kafka 命令如下,如果不在同一宿主机上则kafka需要修改config目录下server.properties的zookeeper.connect=localhost:2181 配置项,修改为zookeeper的地址 )

bin/kafka-server-start.sh config/server.properties

停止kafka 可直接使用control +C 或者执行

bin/kafka-server-stop.sh

kafka启动后我门可以新建一个topic (topic的名字为testyang,–replication-factor表示复制到多少个节点,–partitions表示分区数,一般都设置为2或与节点数相等,不能大于总节点数)
1.png


查看topic
1.png


创建一个producer 发送数据
1.png


创建一个consumer 消费数据
1.png
单机版已经可以正常运行使用了(注意此处zookeeper的地址,和kafka的地址,图中我的zookeeper 在宿主机中,kafka在docker 容器中。读者编写时,应以自己的实际地址为准)


第三步,构建kafka集群

(构建集群时我的kafka和zookeeper都在本机)
构建集群其实就是把server.properties 文件复制多份
server-1.properties
server-2.properties


修改其中部分参数,此处只列出修改的参数(由于是伪集群所以host.name相同,都为本机地址)
config/server.properties:中新增
[mw_shl_code=text,true]port=9092
host.name=192.168.0.101[/mw_shl_code]
config/server-1.properties:
[mw_shl_code=text,true]broker.id=1
port=9093
log.dir=/tmp/kafka-logs-1
host.name=192.168.0.101[/mw_shl_code]
config/server-2.properties:
[mw_shl_code=text,true]broker.id=2
port=9094
log.dir=/tmp/kafka-logs-2
host.name=192.168.0.101[/mw_shl_code]


注意:
真正集群要设置host.name和advertised.host.name这两个属性(博主感觉只要host.name就行了,没上业务,不好评论)host.name 一定要配成真实IP 如 192.168.0.101


然后打开三个终端分别启动三个broker(也可以在命令最后加&符号,让其在后台运行)
[mw_shl_code=text,true]bin/kafka-server-start.sh config/server.properties
bin/kafka-server-start.sh config/server-1.properties
bin/kafka-server-start.sh config/server-2.properties[/mw_shl_code]


启动成功后创建一个topic (设置3个partition)

1.png


然后查看topic描述
1.png
在文章结尾附上两份kafka的参数配置详解


第四步,编写java代码



伪集群构建成功且创建topic好了以后,就可以编写java 客户端代码了。
创建一个maven工程加入pom依赖
[mw_shl_code=xml,true]<dependency>

               
<groupId>org.apache.kafka</groupId>
                <artifactId>kafka_2.11</artifactId>
                <version>0.10.0.0</version>
            </dependency>

            <dependency>
                <groupId>commons-logging</groupId>
                <artifactId>commons-logging</artifactId>
                <version>1.1.1</version>
            </dependency>[/mw_shl_code]


编写producer


[mw_shl_code=java,true]package com.us.kafka;
import java.util.Properties;
import java.util.concurrent.TimeUnit;  
import kafka.javaapi.producer.Producer;  
import kafka.producer.KeyedMessage;  
import kafka.producer.ProducerConfig;  
import kafka.serializer.StringEncoder;  

public class KafkaProducer extends Thread{
     private String topic;  

        public KafkaProducer(String topic){  
            super();  
            this.topic = topic;  
        }  


        @Override  
        public void run() {  
            Producer producer = createProducer();  
            int i=0;  
            while(true){  
                producer.send(new KeyedMessage<Integer, String>(topic, "message: " + i++));  
                try {  
                    TimeUnit.SECONDS.sleep(2);  
                } catch (InterruptedException e) {  
                    e.printStackTrace();  
                }  
            }  
        }  

        private Producer createProducer() {  
            Properties properties = new Properties();  
            properties.put("zookeeper.connect", "192.168.0.101:2181");//声明zk  
            properties.put("serializer.class", StringEncoder.class.getName());  
            properties.put("metadata.broker.list", "192.168.0.101:9092,192.168.0.101:9093,192.168.0.101:9094");// 声明kafka broker ,要注意地址一定要正确
            return new Producer<Integer, String>(new ProducerConfig(properties));  
         }  


        public static void main(String[] args) {  
            new KafkaProducer("test").start();// 使用kafka集群中创建好的主题 test   

        }  

    } [/mw_shl_code]



编写consumer


[mw_shl_code=java,true]package com.us.kafka;
import java.util.HashMap;  
import java.util.List;  
import java.util.Map;  
import java.util.Properties;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;  
import kafka.consumer.ConsumerIterator;  
import kafka.consumer.KafkaStream;  
import kafka.javaapi.consumer.ConsumerConnector;  


public class kafkaConsumer extends Thread{
    private String topic;  

    public kafkaConsumer(String topic){  
        super();  
        this.topic = topic;  
    }  


    @Override  
    public void run() {  
        ConsumerConnector consumer = createConsumer();  
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();  
        topicCountMap.put(topic, 1); // 一次从主题中获取一个数据  
         Map<String, List<KafkaStream<byte[], byte[]>>>  messageStreams = consumer.createMessageStreams(topicCountMap);  
         KafkaStream<byte[], byte[]> stream = messageStreams.get(topic).get(0);// 获取每次接收到的这个数据  
         ConsumerIterator<byte[], byte[]> iterator =  stream.iterator();  
         while(iterator.hasNext()){  
             String message = new String(iterator.next().message());  
             System.out.println("接收到: " + message);  
         }  
    }  

    private ConsumerConnector createConsumer() {  
        Properties properties = new Properties();  
        properties.put("zookeeper.connect", "localhost:2181");//声明zk  
        properties.put("group.id", "group1");// 必须要使用别的组名称, 如果生产者和消费者都在同一组,则不能访问同一组内的topic数据  
        properties.put("zookeeper.session.timeout.ms", "40000");
        properties.put("zookeeper.sync.time.ms", "200");
        properties.put("auto.commit.interval.ms", "1000");
        return Consumer.createJavaConsumerConnector(new ConsumerConfig(properties));  
     }  

    public static void main(String[] args) {  
        new kafkaConsumer("test").start();// 使用kafka集群中创建好的主题 test   

    }      
}  [/mw_shl_code]


启动produce
如图示

1.png


启动consumer

如图示
1.png


如果运行失败请注意错误信息,多半是因为地址配置错误,导致链接不上。如Fetching topic metadata with correlation id 0 for topics [Set(testyang)] from broker [BrokerEndPoint(0,192.168.65.2,9093)] failed
java.nio.channels.ClosedChannelException
为找不到broker ,要注意broker的地址是否正确,能否ping 通



作者:双斜杠少年
来源:csdn



没找到任何评论,期待你打破沉寂

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条