kafka配置kerberos
logstash就无法往里发送数据了
启动kafka的消费端
[root@hadoop5 ~]# kafka-console-consumer --topic syslog_3_4 --zookeeper hadoop3:2181,hadoop4:2181,hadoop5:2181
讯息如下:
[mw_shl_code=xml,true][2016-06-16 12:37:57,240] WARN [console-consumer-34643_hadoop5-1466051869221-fa559637-leader-finder-thread], Failed to find leader for Set([syslog_3_4,0]) (kafka.consumer.ConsumerFetcherManager$LeaderFinderThread)
kafka.common.BrokerEndPointNotAvailableException: End point PLAINTEXT not found for broker 80
at kafka.cluster.Broker.getBrokerEndPoint(Broker.scala:141)
at kafka.utils.ZkUtils$$anonfun$getAllBrokerEndPointsForChannel$1.apply(ZkUtils.scala:171)
at kafka.utils.ZkUtils$$anonfun$getAllBrokerEndPointsForChannel$1.apply(ZkUtils.scala:171)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:245)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at kafka.utils.ZkUtils.getAllBrokerEndPointsForChannel(ZkUtils.scala:171)
at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:65)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)[/mw_shl_code]
kafka把kerberos认证停掉,就可以正常收到消息了。
logstash部分要怎么设置呢 才能发送到需要kerberos认证的kafka中?
目前的logstash配置:
[mw_shl_code=xml,true]input {
tcp {
codec => json_lines
mode => "server"
host => "0.0.0.0"
port => 514
}
}
output {
kafka {
topic_id => "syslog_3_4"
bootstrap_servers => "hadoop5:9092,hadoop3:9092,hadoop4:9092"
}
}[/mw_shl_code]
|