- http://mirror.bit.edu.cn/apache/kafka/
- 2.10表示Scala的版本,而0.8.1.1表示Kafka的版本
配置Kafka 1. 修改配置文件config/server.properties
- # Hostname the broker will bind to. If not set, the server will bind to all interfaces
- host.name=localhost
- # Hostname the broker will advertise to producers and consumers. If not set, it uses the
- # value for "host.name" if configured. Otherwise, it will use the value returned from
- # java.net.InetAddress.getCanonicalHostName().
- advertised.host.name=localhost
2. 配置Zookeeper
除了安装运行独立的Zookeeper,Kafka也可以使用安装包里的Zookeeper,如果Kafka要使用自己的Zookeeper,那么需要在 Kafka的bin目录下启动Zookeeper。因此,如果使用独立的Zookeeper的时候,就无 需启动Kafka下面的Zookeeper了。在 Kafka启动过程中看到有关Zookeeper的日志,这是Kafka作为Zookeeper的客户端正在建立与Zookeeper服务器的通讯
- ############################# Zookeeper #############################
- # Zookeeper connection string (see zookeeper docs for details).
- # This is a comma separated host:port pairs, each corresponding to a zk
- # server. e.g. ",,".
- # You can also append an optional chroot string to the urls to specify the
- # root directory for all kafka znodes.
- //2181是Zookeeper的clientPort
- zookeeper.connect=localhost:2181
- # Timeout in ms for connecting to zookeeper
- zookeeper.connection.timeout.ms=1000000
- config/producer.properties
- config/consumer.properties
- # Zookeeper connection string
- # comma separated host:port pairs, each corresponding to a zk
- # server. e.g. ",,"
- zookeeper.connect=
- # timeout in ms for connecting to zookeeper
- zookeeper.connection.timeout.ms=1000000
2. 使用如下命令启动Zookeeper,启动Zookeeper的参数如下:
- # The number of milliseconds of each tick
- tickTime=2000
- # The number of ticks that the initial
- # synchronization phase can take
- initLimit=10
- # The number of ticks that can pass between
- # sending a request and getting an acknowledgement
- syncLimit=5
- # the directory where the snapshot is stored.
- # do not use /tmp for storage, /tmp here is just
- # example sakes.
- dataDir=/home/hadoop/software/zookeeper-3.4.6/data
- # the port at which the clients will connect
- clientPort=2181
- [hadoop@hadoop kafka_2.10-]$ bin/kafka-server-start.sh config/server.properties
2. 启动日志
- 2015-01-11 01:11:12,490] INFO Verifying properties (kafka.utils.VerifiableProperties)
- [2015-01-11 01:11:12,558] INFO Property broker.id is overridden to 0 (kafka.utils.VerifiableProperties)
- [2015-01-11 01:11:12,558] INFO Property log.cleaner.enable is overridden to false (kafka.utils.VerifiableProperties)
- [2015-01-11 01:11:12,558] INFO Property log.dirs is overridden to /tmp/kafka-logs (kafka.utils.VerifiableProperties)
- [2015-01-11 01:11:12,558] INFO Property log.retention.check.interval.ms is overridden to 60000 (kafka.utils.VerifiableProperties)
- [2015-01-11 01:11:12,559] INFO Property log.retention.hours is overridden to 168 (kafka.utils.VerifiableProperties)
- [2015-01-11 01:11:12,559] INFO Property log.segment.bytes is overridden to 536870912 (kafka.utils.VerifiableProperties)
- [2015-01-11 01:11:12,559] INFO Property num.io.threads is overridden to 8 (kafka.utils.VerifiableProperties)
- [2015-01-11 01:11:12,559] INFO Property num.network.threads is overridden to 2 (kafka.utils.VerifiableProperties)
- [2015-01-11 01:11:12,559] INFO Property num.partitions is overridden to 2 (kafka.utils.VerifiableProperties)
- [2015-01-11 01:11:12,559] INFO Property port is overridden to 9092 (kafka.utils.VerifiableProperties)
- [2015-01-11 01:11:12,559] INFO Property socket.receive.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties)
- [2015-01-11 01:11:12,560] INFO Property socket.request.max.bytes is overridden to 104857600 (kafka.utils.VerifiableProperties)
- [2015-01-11 01:11:12,560] INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties)
- [2015-01-11 01:11:12,560] INFO Property zookeeper.connect is overridden to localhost:2181 (kafka.utils.VerifiableProperties)
- [2015-01-11 01:11:12,560] INFO Property zookeeper.connection.timeout.ms is overridden to 1000000 (kafka.utils.VerifiableProperties)
- [2015-01-11 01:11:12,607] INFO [Kafka Server 0], starting (kafka.server.KafkaServer)
- [2015-01-11 01:11:12,609] INFO [Kafka Server 0], Connecting to zookeeper on localhost:2181 (kafka.server.KafkaServer)
- [2015-01-11 01:11:12,640] INFO Client environment:zookeeper.version=3.3.3-1203054, built on 11/17/2011 05:47 GMT (org.apache.zookeeper.ZooKeeper)
- [2015-01-11 01:11:12,640] INFO Client environment:host.name=hadoop.master (org.apache.zookeeper.ZooKeeper)
- [2015-01-11 01:11:12,640] INFO Client environment:java.version=1.7.0_67 (org.apache.zookeeper.ZooKeeper)
- [2015-01-11 01:11:12,640] INFO Client environment:java.vendor=Oracle Corporation (org.apache.zookeeper.ZooKeeper)
- [2015-01-11 01:11:12,640] INFO Client environment:java.home=/home/hadoop/software/jdk1.7.0_67/jre (org.apache.zookeeper.ZooKeeper)
- [2015-01-11 01:11:12,640] INFO Client environment:java.class.path=:/home/hadoop/software/kafka_2.10-*.jar:/home/hadoop/software/kafka_2.10-*.jar:/home/hadoop/software/kafka_2.10-*.jar:/home/hadoop/software/kafka_2.10-*.jar:/home/hadoop/software/kafka_2.10-*.jar:/home/hadoop/software/kafka_2.10-*.jar:/home/hadoop/software/kafka_2.10-*.jar (org.apache.zookeeper.ZooKeeper)
- [2015-01-11 01:11:12,640] INFO Client environment:java.library.path=/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib (org.apache.zookeeper.ZooKeeper)
- [2015-01-11 01:11:12,640] INFO Client environment:java.io.tmpdir=/tmp (org.apache.zookeeper.ZooKeeper)
- [2015-01-11 01:11:12,640] INFO Client environment:java.compiler=<NA> (org.apache.zookeeper.ZooKeeper)
- [2015-01-11 01:11:12,640] INFO Client environment:os.name=Linux (org.apache.zookeeper.ZooKeeper)
- [2015-01-11 01:11:12,640] INFO Client environment:os.arch=amd64 (org.apache.zookeeper.ZooKeeper)
- [2015-01-11 01:11:12,640] INFO Client environment:os.version=3.10.0-123.el7.x86_64 (org.apache.zookeeper.ZooKeeper)
- [2015-01-11 01:11:12,640] INFO Client environment:user.name=hadoop (org.apache.zookeeper.ZooKeeper)
- [2015-01-11 01:11:12,640] INFO Client environment:user.home=/home/hadoop (org.apache.zookeeper.ZooKeeper)
- [2015-01-11 01:11:12,640] INFO Client environment:user.dir=/home/hadoop/software/kafka_2.10- (org.apache.zookeeper.ZooKeeper)
- [2015-01-11 01:11:12,641] INFO Initiating client connection, connectString=localhost:2181 sessionTimeout=6000 watcher=org.I0Itec.zkclient.ZkClient@7a50a6d4 (org.apache.zookeeper.ZooKeeper)
- [2015-01-11 01:11:12,643] INFO Starting ZkClient event thread. (org.I0Itec.zkclient.ZkEventThread)
- [2015-01-11 01:11:12,706] INFO Opening socket connection to server localhost/ (org.apache.zookeeper.ClientCnxn)
- [2015-01-11 01:11:12,716] INFO Socket connection established to localhost/, initiating session (org.apache.zookeeper.ClientCnxn)
- [2015-01-11 01:11:12,756] INFO Session establishment complete on server localhost/, sessionid = 0x14ad79bb13d0000, negotiated timeout = 6000 (org.apache.zookeeper.ClientCnxn)
- [2015-01-11 01:11:12,759] INFO zookeeper state changed (SyncConnected) (org.I0Itec.zkclient.ZkClient)
- [2015-01-11 01:11:12,919] INFO Log directory '/tmp/kafka-logs' not found, creating it. (kafka.log.LogManager)
- [2015-01-11 01:11:12,948] INFO Starting log cleanup with a period of 60000 ms. (kafka.log.LogManager)
- [2015-01-11 01:11:12,975] INFO Starting log flusher with a default period of 9223372036854775807 ms. (kafka.log.LogManager)
- SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
- SLF4J: Defaulting to no-operation (NOP) logger implementation
- SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
- [2015-01-11 01:11:13,039] INFO Awaiting socket connections on (kafka.network.Acceptor)
- [2015-01-11 01:11:13,063] INFO [Socket Server on Broker 0], Started (kafka.network.SocketServer)
- [2015-01-11 01:11:13,163] INFO Will not load MX4J, mx4j-tools.jar is not in the classpath (kafka.utils.Mx4jLoader$)
- [2015-01-11 01:11:13,219] INFO 0 successfully elected as leader (kafka.server.ZookeeperLeaderElector)
- [2015-01-11 01:11:13,367] INFO Registered broker 0 at path /brokers/ids/0 with address hadoop.master:9092. (kafka.utils.ZkUtils$)
- [2015-01-11 01:11:13,379] INFO [Kafka Server 0], started (kafka.server.KafkaServer)
- [2015-01-11 01:11:13,486] INFO New leader is 0 (kafka.server.ZookeeperLeaderElector$LeaderChangeListener)
- ///创建一个Topic,取名为test
- [hadoop@hadoop kafka_2.10-]$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
- Created topic "test".
- ///列出创建的Topic,这里只有一个test
- [hadoop@hadoop kafka_2.10-]$ bin/kafka-topics.sh --list --zookeeper localhost:2181
- test
2. Producer创建消息
- [hadoop@hadoop kafka_2.10-]$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
- SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
- SLF4J: Defaulting to no-operation (NOP) logger implementation
- SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
- This is mesage
- This is a test
3. Consumer消费消息
- [hadoop@hadoop kafka_2.10-]$ bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
- SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
- SLF4J: Defaulting to no-operation (NOP) logger implementation
- SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
- This is mesage
- This is a test