topic有多个吗?al topics = Array("test")
下面代码楼主可以对比下
[mw_shl_code=scala,true]import kafka.api.OffsetRequest
import kafka.message.MessageAndMetadata
import kafka.serializer.StringDecoder
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* Created by wangwei01 on 2017/3/12.
*/
object StreamingReadData {
Logger.getRootLogger.setLevel(Level.WARN)
def main(args: Array[String]): Unit = {
// 创建SparkConf对象,并指定AppName和Master
val conf = new SparkConf()
.setAppName("StreamingReadData")
.setMaster("local")
// 创建StreamingContext对象
val ssc = new StreamingContext(conf, Seconds(10))
// val zkServers = "master:2181,slave1:2181,slave2:2181"
// 注意:需要在本机的hosts文件中添加 master/slave1/slave2对应的ip
val brokers = "master:9092,slave1:9092,slave2:9092"
val topics = "aboutyunlog"
val groupId = "consumer_001"
val topicsSet = topics.split(",").toSet
val kafkaParams = Map[String, String](
"metadata.broker.list" -> brokers,
"group.id" -> groupId,
"auto.offset.reset" -> OffsetRequest.SmallestTimeString // 说明每次程序启动,从kafka中最开始的第一条消息开始读取
)
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, topicsSet).map(_._2)
messages.print()
ssc.start()
ssc.awaitTermination()
}
}
[/mw_shl_code]
|