Consumer Group (CG):这是kafka用来实现一个topic消息的广播(发给所有的consumer)和单播(发给任意一个consumer)的手段。一个topic可以有多个CG。topic的消息会复制(不是真的复制,是概念上的)到所有的CG,但每个CG只会把消息发给该CG中的一个consumer。如果需要实现广播,只要每个consumer有一个独立的CG就可以了。要实现单播只要所有的consumer在同一个CG。用CG还可以将consumer进行自由的分组而不需要多次发送消息到不同的topic。
root@m1:/home/hadoop/kafka_2.9.2-0.8.1.1# cat config/server.properties
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# see kafka.server.KafkaConfig for additional details and defaults
############################# Server Basics #############################
# The id of the broker. This must be set to a unique integer for each broker.
#整数,建议根据ip区分,这里我是使用zookeeper中的id来设置
broker.id=1
############################# Socket Server Settings #############################
# The port the socket server listens on
#broker用于接收producer消息的端口
port=9092
#port=44444
# Hostname the broker will bind to. If not set, the server will bind to all interfaces
#broker的hostname
host.name=m1
# Hostname the broker will advertise to producers and consumers. If not set, it uses the
# value for "host.name" if configured. Otherwise, it will use the value returned from
# java.net.InetAddress.getCanonicalHostName().
#这个是配置PRODUCER/CONSUMER连上来的时候使用的地址
advertised.host.name=m1
# The port to publish to ZooKeeper for clients to use. If this is not set,
# it will publish the same port that the broker binds to.
#advertised.port=<port accessible by clients>
# The number of threads handling network requests
num.network.threads=2
# The number of threads doing disk I/O
num.io.threads=8
# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=1048576
# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=1048576
# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600
# A comma seperated list of directories under which to store log files
#kafka存放消息文件的路径
log.dirs=/home/hadoop/kafka_2.9.2-0.8.1.1/kafka-logs
# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
#topic的默认分区数
num.partitions=2
# Messages are immediately written to the filesystem but by default we only fsync() to sync
# the OS cache lazily. The following configurations control the flush of data to disk.
# There are a few important trade-offs here:
# 1. Durability: Unflushed data may be lost if you are not using replication.
# 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
# 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to exceessive seeks.
# The settings below allow one to configure the flush policy to flush data after a period of time or
# every N messages (or both). This can be done globally and overridden on a per-topic basis.
# The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages=10000
# The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms=1000
# The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.
# The minimum age of a log file to be eligible for deletion
#kafka接收日志的存储目录(目前我们保存7天数据log.retention.hours=168)
log.retention.hours=168
# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining
# segments don't drop below log.retention.bytes.
#log.retention.bytes=1073741824
# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=536870912
# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=60000
# By default the log cleaner is disabled and the log retention policy will default to just delete segments after their retention expires.
# If log.cleaner.enable=true is set the cleaner will be enabled and individual logs can then be marked for log compaction.
log.cleaner.enable=false
# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=m1:2181,m2:2181,s1:2181,s2:2181
# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=1000000
root@m1:/home/hadoop# [2014-08-05 10:03:11,210] INFO Verifying properties (kafka.utils.VerifiableProperties)
[2014-08-05 10:03:11,261] INFO Property advertised.host.name is overridden to m1 (kafka.utils.VerifiableProperties)
[2014-08-05 10:03:11,261] INFO Property broker.id is overridden to 1 (kafka.utils.VerifiableProperties)
[2014-08-05 10:03:11,264] INFO Property host.name is overridden to m1 (kafka.utils.VerifiableProperties)
[2014-08-05 10:03:11,264] INFO Property log.cleaner.enable is overridden to false (kafka.utils.VerifiableProperties)
[2014-08-05 10:03:11,264] INFO Property log.dirs is overridden to /home/hadoop/kafka_2.9.2-0.8.1.1/kafka-logs (kafka.utils.VerifiableProperties)
[2014-08-05 10:03:11,265] INFO Property log.retention.check.interval.ms is overridden to 60000 (kafka.utils.VerifiableProperties)
[2014-08-05 10:03:11,265] INFO Property log.retention.hours is overridden to 168 (kafka.utils.VerifiableProperties)
[2014-08-05 10:03:11,265] INFO Property log.segment.bytes is overridden to 536870912 (kafka.utils.VerifiableProperties)
[2014-08-05 10:03:11,265] INFO Property num.io.threads is overridden to 8 (kafka.utils.VerifiableProperties)
[2014-08-05 10:03:11,266] INFO Property num.network.threads is overridden to 2 (kafka.utils.VerifiableProperties)
[2014-08-05 10:03:11,266] INFO Property num.partitions is overridden to 2 (kafka.utils.VerifiableProperties)
[2014-08-05 10:03:11,267] INFO Property port is overridden to 9092 (kafka.utils.VerifiableProperties)
[2014-08-05 10:03:11,267] INFO Property socket.receive.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties)
[2014-08-05 10:03:11,268] INFO Property socket.request.max.bytes is overridden to 104857600 (kafka.utils.VerifiableProperties)
[2014-08-05 10:03:11,268] INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties)
[2014-08-05 10:03:11,268] INFO Property zookeeper.connect is overridden to m1:2181,m2:2181,s1:2181,s2:2181 (kafka.utils.VerifiableProperties)
[2014-08-05 10:03:11,269] INFO Property zookeeper.connection.timeout.ms is overridden to 1000000 (kafka.utils.VerifiableProperties)
[2014-08-05 10:03:11,302] INFO [Kafka Server 1], starting (kafka.server.KafkaServer)
[2014-08-05 10:03:11,303] INFO [Kafka Server 1], Connecting to zookeeper on m1:2181,m2:2181,s1:2181,s2:2181 (kafka.server.KafkaServer)
[2014-08-05 10:03:11,335] INFO Starting ZkClient event thread. (org.I0Itec.zkclient.ZkEventThread)
[2014-08-05 10:03:11,348] INFO Client environment:zookeeper.version=3.3.3-1203054, built on 11/17/2011 05:47 GMT (org.apache.zookeeper.ZooKeeper)
[2014-08-05 10:03:11,348] INFO Client environment:host.name=m1 (org.apache.zookeeper.ZooKeeper)
[2014-08-05 10:03:11,349] INFO Client environment:java.version=1.7.0_65 (org.apache.zookeeper.ZooKeeper)
[2014-08-05 10:03:11,349] INFO Client environment:java.vendor=Oracle Corporation (org.apache.zookeeper.ZooKeeper)
[2014-08-05 10:03:11,349] INFO Client environment:java.home=/usr/lib/jvm/java-7-oracle/jre (org.apache.zookeeper.ZooKeeper)
[2014-08-05 10:03:11,349] INFO Client environment:java.class.path=.:/usr/lib/jvm/java-7-oracle/lib/tools.jar:/usr/lib/jvm/java-7-oracle/lib/dt.jar:/home/hadoop/kafka_2.9.2-0.8.1.1/bin/../core/build/dependant-libs-2.8.0/*.jar:/home/hadoop/kafka_2.9.2-0.8.1.1/bin/../perf/build/libs//kafka-perf_2.8.0*.jar:/home/hadoop/kafka_2.9.2-0.8.1.1/bin/../clients/build/libs//kafka-clients*.jar:/home/hadoop/kafka_2.9.2-0.8.1.1/bin/../examples/build/libs//kafka-examples*.jar:/home/hadoop/kafka_2.9.2-0.8.1.1/bin/../contrib/hadoop-consumer/build/libs//kafka-hadoop-consumer*.jar:/home/hadoop/kafka_2.9.2-0.8.1.1/bin/../contrib/hadoop-producer/build/libs//kafka-hadoop-producer*.jar:/home/hadoop/kafka_2.9.2-0.8.1.1/bin/../libs/jopt-simple-3.2.jar:/home/hadoop/kafka_2.9.2-0.8.1.1/bin/../libs/kafka_2.9.2-0.8.1.1.jar:/home/hadoop/kafka_2.9.2-0.8.1.1/bin/../libs/kafka_2.9.2-0.8.1.1-javadoc.jar:/home/hadoop/kafka_2.9.2-0.8.1.1/bin/../libs/kafka_2.9.2-0.8.1.1-scaladoc.jar:/home/hadoop/kafka_2.9.2-0.8.1.1/bin/../libs/kafka_2.9.2-0.8.1.1-sources.jar:/home/hadoop/kafka_2.9.2-0.8.1.1/bin/../libs/log4j-1.2.15.jar:/home/hadoop/kafka_2.9.2-0.8.1.1/bin/../libs/metrics-core-2.2.0.jar:/home/hadoop/kafka_2.9.2-0.8.1.1/bin/../libs/scala-library-2.9.2.jar:/home/hadoop/kafka_2.9.2-0.8.1.1/bin/../libs/slf4j-api-1.7.2.jar:/home/hadoop/kafka_2.9.2-0.8.1.1/bin/../libs/snappy-java-1.0.5.jar:/home/hadoop/kafka_2.9.2-0.8.1.1/bin/../libs/zkclient-0.3.jar:/home/hadoop/kafka_2.9.2-0.8.1.1/bin/../libs/zookeeper-3.3.4.jar:/home/hadoop/kafka_2.9.2-0.8.1.1/bin/../core/build/libs/kafka_2.8.0*.jar (org.apache.zookeeper.ZooKeeper)
[2014-08-05 10:03:11,350] INFO Client environment:java.library.path=:/usr/local/lib:/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib (org.apache.zookeeper.ZooKeeper)
[2014-08-05 10:03:11,350] INFO Client environment:java.io.tmpdir=/tmp (org.apache.zookeeper.ZooKeeper)
[2014-08-05 10:03:11,350] INFO Client environment:java.compiler=<NA> (org.apache.zookeeper.ZooKeeper)
[2014-08-05 10:03:11,350] INFO Client environment:os.name=Linux (org.apache.zookeeper.ZooKeeper)
[2014-08-05 10:03:11,350] INFO Client environment:os.arch=amd64 (org.apache.zookeeper.ZooKeeper)
[2014-08-05 10:03:11,351] INFO Client environment:os.version=3.11.0-15-generic (org.apache.zookeeper.ZooKeeper)
[2014-08-05 10:03:11,351] INFO Client environment:user.name=root (org.apache.zookeeper.ZooKeeper)
[2014-08-05 10:03:11,351] INFO Client environment:user.home=/root (org.apache.zookeeper.ZooKeeper)
[2014-08-05 10:03:11,351] INFO Client environment:user.dir=/home/hadoop (org.apache.zookeeper.ZooKeeper)
[2014-08-05 10:03:11,352] INFO Initiating client connection, connectString=m1:2181,m2:2181,s1:2181,s2:2181 sessionTimeout=6000 watcher=org.I0Itec.zkclient.ZkClient@51f782b8 (org.apache.zookeeper.ZooKeeper)
[2014-08-05 10:03:11,380] INFO Opening socket connection to server m2/192.168.1.51:2181 (org.apache.zookeeper.ClientCnxn)
[2014-08-05 10:03:11,386] INFO Socket connection established to m2/192.168.1.51:2181, initiating session (org.apache.zookeeper.ClientCnxn)
[2014-08-05 10:03:11,398] INFO Session establishment complete on server m2/192.168.1.51:2181, sessionid = 0x247a3e09b460000, negotiated timeout = 6000 (org.apache.zookeeper.ClientCnxn)
[2014-08-05 10:03:11,400] INFO zookeeper state changed (SyncConnected) (org.I0Itec.zkclient.ZkClient)
[2014-08-05 10:03:11,652] INFO Loading log 'test-1' (kafka.log.LogManager)
[2014-08-05 10:03:11,681] INFO Recovering unflushed segment 0 in log test-1. (kafka.log.Log)
[2014-08-05 10:03:11,711] INFO Completed load of log test-1 with log end offset 137 (kafka.log.Log)
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
[2014-08-05 10:03:11,747] INFO Loading log 'idoall.org-0' (kafka.log.LogManager)
[2014-08-05 10:03:11,748] INFO Recovering unflushed segment 0 in log idoall.org-0. (kafka.log.Log)
[2014-08-05 10:03:11,754] INFO Completed load of log idoall.org-0 with log end offset 5 (kafka.log.Log)
[2014-08-05 10:03:11,760] INFO Loading log 'test-0' (kafka.log.LogManager)
[2014-08-05 10:03:11,765] INFO Recovering unflushed segment 0 in log test-0. (kafka.log.Log)
[2014-08-05 10:03:11,777] INFO Completed load of log test-0 with log end offset 151 (kafka.log.Log)
[2014-08-05 10:03:11,779] INFO Starting log cleanup with a period of 60000 ms. (kafka.log.LogManager)
[2014-08-05 10:03:11,782] INFO Starting log flusher with a default period of 9223372036854775807 ms. (kafka.log.LogManager)
[2014-08-05 10:03:11,800] INFO Awaiting socket connections on m1:9092. (kafka.network.Acceptor)
[2014-08-05 10:03:11,802] INFO [Socket Server on Broker 1], Started (kafka.network.SocketServer)
[2014-08-05 10:03:11,890] INFO Will not load MX4J, mx4j-tools.jar is not in the classpath (kafka.utils.Mx4jLoader$)
[2014-08-05 10:03:11,919] INFO 1 successfully elected as leader (kafka.server.ZookeeperLeaderElector)
[2014-08-05 10:03:12,359] INFO New leader is 1 (kafka.server.ZookeeperLeaderElector$LeaderChangeListener)
[2014-08-05 10:03:12,387] INFO Registered broker 1 at path /brokers/ids/1 with address m1:9092. (kafka.utils.ZkUtils$)
[2014-08-05 10:03:12,392] INFO [Kafka Server 1], started (kafka.server.KafkaServer)
[2014-08-05 10:03:12,671] INFO [ReplicaFetcherManager on broker 1] Removed fetcher for partitions [idoall.org,0],[test,0],[test,1] (kafka.server.ReplicaFetcherManager)
[2014-08-05 10:03:12,741] INFO [ReplicaFetcherManager on broker 1] Removed fetcher for partitions [idoall.org,0],[test,0],[test,1] (kafka.server.ReplicaFetcherManager)
[2014-08-05 10:03:25,327] INFO Partition [test,0] on broker 1: Expanding ISR for partition [test,0] from 1 to 1,2 (kafka.cluster.Partition)
[2014-08-05 10:03:25,334] INFO Partition [test,1] on broker 1: Expanding ISR for partition [test,1] from 1 to 1,2 (kafka.cluster.Partition)
[2014-08-05 10:03:26,905] INFO Partition [test,1] on broker 1: Expanding ISR for partition [test,1] from 1,2 to 1,2,3 (kafka.cluster.Partition)
[2014-08-05 10:08:29,315] INFO [ReplicaFetcherManager on broker 1] Removed fetcher for partitions [idoall_testTopic,0] (kafka.server.ReplicaFetcherManager)
[2014-08-05 10:08:29,334] INFO Completed load of log idoall_testTopic-0 with log end offset 0 (kafka.log.Log)
[2014-08-05 10:08:29,384] WARN Partition [idoall_testTopic,0] on broker 1: No checkpointed highwatermark is found for partition [idoall_testTopic,0] (kafka.cluster.Partition)
[2014-08-05 10:08:29,415] INFO Completed load of log idoall_testTopic-1 with log end offset 0 (kafka.log.Log)
[2014-08-05 10:08:29,422] WARN Partition [idoall_testTopic,1] on broker 1: No checkpointed highwatermark is found for partition [idoall_testTopic,1] (kafka.cluster.Partition)
[2014-08-05 10:08:29,430] INFO [ReplicaFetcherManager on broker 1] Removed fetcher for partitions [idoall_testTopic,1] (kafka.server.ReplicaFetcherManager)
[2014-08-05 10:08:29,438] INFO Truncating log idoall_testTopic-1 to offset 0. (kafka.log.Log)
[2014-08-05 10:08:29,473] INFO [ReplicaFetcherManager on broker 1] Added fetcher for partitions ArrayBuffer([[idoall_testTopic,1], initOffset 0 to broker id:2,host:m2,port:9092] ) (kafka.server.ReplicaFetcherManager)
[2014-08-05 10:08:29,475] INFO [ReplicaFetcherThread-0-2], Starting (kafka.server.ReplicaFetcherThread)
[2014-08-05 10:38:30,870] WARN Partition [idoall,1] on broker 1: No checkpointed highwatermark is found for partition [idoall,1] (kafka.cluster.Partition)
[2014-08-05 10:38:30,878] INFO [ReplicaFetcherManager on broker 1] Removed fetcher for partitions [idoall,1] (kafka.server.ReplicaFetcherManager)
[2014-08-05 10:38:30,880] INFO Truncating log idoall-1 to offset 0. (kafka.log.Log)
[2014-08-05 10:38:30,885] INFO [ReplicaFetcherManager on broker 1] Added fetcher for partitions ArrayBuffer([[idoall,1], initOffset 0 to broker id:3,host:s1,port:9092] ) (kafka.server.ReplicaFetcherManager)
[2014-08-05 10:38:30,887] INFO [ReplicaFetcherThread-0-3], Starting (kafka.server.ReplicaFetcherThread)
2014-08-05 10:15:21,920 [myid:] - INFO [main-SendThread(localhost:2181):ClientCnxn$SendThread@966] - Opening socket connection to server localhost/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error)
2014-08-05 10:15:21,934 [myid:] - INFO [main-SendThread(localhost:2181):ClientCnxn$SendThread@849] - Socket connection established to localhost/127.0.0.1:2181, initiating session
JLine support is enabled
2014-08-05 10:15:21,966 [myid:] - INFO [main-SendThread(localhost:2181):ClientCnxn$SendThread@1207] - Session establishment complete on server localhost/127.0.0.1:2181, sessionid = 0x147a3e1246b0007, negotiated timeout = 30000
Consumer Group (CG):这是kafka用来实现一个topic消息的广播(发给所有的consumer)和单播(发给任意一个consumer)的手段。一个topic可以有多个CG。topic的消息会复制(不是真的复制,是概念上的)到所有的CG,但每个CG只会把消息发给该CG中的一个consumer。如果需要实现广播,只要每个consumer有一个独立的CG就可以了。要实现单播只要所有的consumer在同一个CG。用CG还可以将consumer进行自由的分组而不需要多次发送消息到不同的topic。
这句我怎么看着别扭呢,如果是广播,每个consumer的CG不同,而每个CG只会把消息发给该CG中的一个consumer,那广播还能把消息发给所有的consumer吗?