分享

kafka报错 自动关闭数据流了

Wyy_Ck 发表于 2017-9-9 22:34:27 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 2 15899
[root@master bin]# ./kafka-console-consumer.sh --zookeeper  master:2181 --topic shuaigeck --from-beginning
[2017-09-07 13:04:39,044] INFO [Group Metadata Manager on Broker 0]: Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.GroupMetadataManager)

好像kafka自动关闭连接了

而且一直会打印:
[2017-09-07 12:53:14,542] INFO Socket connection established to 192.168.86.134/192.168.86.134:2181, initiating session (org.apache.zookeeper.ClientCnxn)
[2017-09-07 12:53:14,882] INFO Unable to reconnect to ZooKeeper service, session 0x35e5d0306110001 has expired, closing socket connection (org.apache.zookeeper.ClientCnxn)
[2017-09-07 12:53:14,882] INFO zookeeper state changed (Expired) (org.I0Itec.zkclient.ZkClient)
[2017-09-07 12:53:14,882] INFO Initiating client connection, connectString=192.168.86.132:2181,192.168.86.133:2181,192.168.86.134:2181 sessionTimeout=6000 watcher=org.I0Itec.zkclient.ZkClient@92d4709 (org.apache.zookeeper.ZooKeeper)
[2017-09-07 12:53:14,886] INFO Opening socket connection to server 192.168.86.132/192.168.86.132:2181. Will not attempt to authenticate using SASL (unknown error) (org.apache.zookeeper.ClientCnxn)
[2017-09-07 12:53:14,928] INFO EventThread shut down (org.apache.zookeeper.ClientCnxn)
[2017-09-07 12:53:14,929] INFO Socket connection established to 192.168.86.132/192.168.86.132:2181, initiating session (org.apache.zookeeper.ClientCnxn)
[2017-09-07 12:53:15,266] INFO Session establishment complete on server 192.168.86.132/192.168.86.132:2181, sessionid = 0x35e5d0306110012, negotiated timeout = 6000 (org.apache.zookeeper.ClientCnxn)
[2017-09-07 12:53:15,267] INFO zookeeper state changed (SyncConnected) (org.I0Itec.zkclient.ZkClient)
[2017-09-07 12:53:15,637] INFO re-registering broker info in ZK for broker 1 (kafka.server.KafkaHealthcheck$SessionExpireListener)
[2017-09-07 12:53:15,679] INFO Creating /brokers/ids/1 (is it secure? false) (kafka.utils.ZKCheckedEphemeral)
[2017-09-07 12:53:15,689] INFO Result of znode creation is: OK (kafka.utils.ZKCheckedEphemeral)
[2017-09-07 12:53:15,690] INFO Registered broker 1 at path /brokers/ids/1 with addresses: PLAINTEXT -> EndPoint(192.168.86.134,9092,PLAINTEXT) (kafka.utils.ZkUtils)
[2017-09-07 12:53:15,695] INFO done re-registering broker (kafka.server.KafkaHealthcheck$SessionExpireListener)
[2017-09-07 12:53:15,724] INFO Subscribing to /brokers/topics path to watch for new topics (kafka.server.KafkaHealthcheck$SessionExpireListener)
[2017-09-07 12:53:15,873] INFO New leader is 0 (kafka.server.ZookeeperLeaderElector$LeaderChangeListener)
[2017-09-07 12:53:16,255] INFO [ReplicaFetcherManager on broker 1] Removed fetcher for partitions  (kafka.server.ReplicaFetcherManager)
[2017-09-07 12:53:16,302] INFO [ReplicaFetcherManager on broker 1] Added fetcher for partitions List() (kafka.server.ReplicaFetcherManager)
[2017-09-07 12:53:16,321] INFO [ReplicaFetcherManager on broker 1] Removed fetcher for partitions [shuaige,0] (kafka.server.ReplicaFetcherManager)
[2017-09-07 12:54:46,227] INFO [Group Metadata Manager on Broker 1]: Removed 0 expired offsets in 1 milliseconds. (kafka.coordinator.GroupMetadataManager)
[2017-09-07 13:04:46,226] INFO [Group Metadata Manager on Broker 1]: Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.GroupMetadataManager)







已有(2)人评论

跳转到指定楼层
nextuser 发表于 2017-9-10 08:21:54
kafka 0.9.0+ 版本会每隔一段时间主动关闭空闲连接,默认是10分钟
关闭该开关的办法:在调用librdkafka创建kafka实例时,增加配置。
[mw_shl_code=bash,true]if (rd_kafka_conf_set(rk_conf, "log.connection.close", "false", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
    writeLog("set log.connection.close failed.\n");
}

rk = rd_kafka_new(RD_KAFKA_PRODUCER, rk_conf, errstr, sizeof(errstr));[/mw_shl_code]

回复

使用道具 举报

Wyy_Ck 发表于 2017-9-10 12:17:36
nextuser 发表于 2017-9-10 08:21
kafka 0.9.0+ 版本会每隔一段时间主动关闭空闲连接,默认是10分钟
关闭该开关的办法:在调用librdkafka创 ...

OK  我试试 谢谢
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条