我看了一下,是我的jar包和spark命令出了问题
我的spark是2.30版本的,kafka是2.11-1.1.0 版本,
本来应该用0.10版本的包,我却用来0.8的
而且代码结构也要改,我改了之后是可以运行了,但是spark却不能从kafka中读取到数据
import org.apache.spark.SparkConf
import org.apache.spark.streaming.Seconds
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.kafka.clients.consumer.ConsumerRecord
object Nginx {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("Nginx").setMaster("local[2]")
val ssc = new StreamingContext(sc, Seconds(3))
val topics = Array("kafkatext")
val groupId = "con-consumer-group"
val brokers = "192.168.19.13:9092"
val kafkaParams = Map(
"bootstrap.servers" -> brokers,
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> groupId,
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
var lines = KafkaUtils.createDirectStream[String,String](ssc, PreferConsistent,Subscribe[String,String](topics,kafkaParams))
val nginx = lines.map(s =>(s.value()))
val Nginx = nginx.flatMap(line=>(line.split("\\t")).map(line=>(line.split(" ")(2),line.split(" ")(29),line.split(" ")(27))))
Nginx.print()
ssc.start()
ssc.awaitTermination()
}
}
|