kafka报错 自动关闭数据流了
# ./kafka-console-consumer.sh --zookeepermaster:2181 --topic shuaigeck --from-beginningINFO : Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.GroupMetadataManager)
好像kafka自动关闭连接了
而且一直会打印:
INFO Socket connection established to 192.168.86.134/192.168.86.134:2181, initiating session (org.apache.zookeeper.ClientCnxn)
INFO Unable to reconnect to ZooKeeper service, session 0x35e5d0306110001 has expired, closing socket connection (org.apache.zookeeper.ClientCnxn)
INFO zookeeper state changed (Expired) (org.I0Itec.zkclient.ZkClient)
INFO Initiating client connection, connectString=192.168.86.132:2181,192.168.86.133:2181,192.168.86.134:2181 sessionTimeout=6000 watcher=org.I0Itec.zkclient.ZkClient@92d4709 (org.apache.zookeeper.ZooKeeper)
INFO Opening socket connection to server 192.168.86.132/192.168.86.132:2181. Will not attempt to authenticate using SASL (unknown error) (org.apache.zookeeper.ClientCnxn)
INFO EventThread shut down (org.apache.zookeeper.ClientCnxn)
INFO Socket connection established to 192.168.86.132/192.168.86.132:2181, initiating session (org.apache.zookeeper.ClientCnxn)
INFO Session establishment complete on server 192.168.86.132/192.168.86.132:2181, sessionid = 0x35e5d0306110012, negotiated timeout = 6000 (org.apache.zookeeper.ClientCnxn)
INFO zookeeper state changed (SyncConnected) (org.I0Itec.zkclient.ZkClient)
INFO re-registering broker info in ZK for broker 1 (kafka.server.KafkaHealthcheck$SessionExpireListener)
INFO Creating /brokers/ids/1 (is it secure? false) (kafka.utils.ZKCheckedEphemeral)
INFO Result of znode creation is: OK (kafka.utils.ZKCheckedEphemeral)
INFO Registered broker 1 at path /brokers/ids/1 with addresses: PLAINTEXT -> EndPoint(192.168.86.134,9092,PLAINTEXT) (kafka.utils.ZkUtils)
INFO done re-registering broker (kafka.server.KafkaHealthcheck$SessionExpireListener)
INFO Subscribing to /brokers/topics path to watch for new topics (kafka.server.KafkaHealthcheck$SessionExpireListener)
INFO New leader is 0 (kafka.server.ZookeeperLeaderElector$LeaderChangeListener)
INFO Removed fetcher for partitions(kafka.server.ReplicaFetcherManager)
INFO Added fetcher for partitions List() (kafka.server.ReplicaFetcherManager)
INFO Removed fetcher for partitions (kafka.server.ReplicaFetcherManager)
INFO : Removed 0 expired offsets in 1 milliseconds. (kafka.coordinator.GroupMetadataManager)
INFO : Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.GroupMetadataManager)
kafka 0.9.0+ 版本会每隔一段时间主动关闭空闲连接,默认是10分钟
关闭该开关的办法:在调用librdkafka创建kafka实例时,增加配置。
if (rd_kafka_conf_set(rk_conf, "log.connection.close", "false", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
writeLog("set log.connection.close failed.\n");
}
rk = rd_kafka_new(RD_KAFKA_PRODUCER, rk_conf, errstr, sizeof(errstr));
nextuser 发表于 2017-9-10 08:21
kafka 0.9.0+ 版本会每隔一段时间主动关闭空闲连接,默认是10分钟
关闭该开关的办法:在调用librdkafka创 ...
OK我试试 谢谢
页:
[1]