在线等!spark streaming整合kafka 无法消费到topic数据
环境:jar:spark-streaming-kafka-0-10_2.11spark:2.1.0
kafka:0.10.0
scala:2.11.8
代码如下:def main(args: Array) {
val topics = Array("test")
val brokers = "master:9092,slave2:9092"
val sparkConf = new SparkConf().setAppName("demo").setMaster("local")
val ssc = new StreamingContext(sparkConf, Seconds(2))
val kafkaParams = Map(
"bootstrap.servers" -> brokers,
"key.deserializer" -> classOf,
"value.deserializer" -> classOf,
"group.id" -> "example",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val stream = KafkaUtils.createDirectStream(
ssc,
PreferConsistent,
Subscribe(topics, kafkaParams)
)
stream.foreachRDD { rdd =>
rdd.foreachPartition(
message => {
while (message.hasNext) {
println(message.next())
}
})
}
ssc.start()
ssc.awaitTermination()
}
使用kafka-console-consumer可以消费到数据。
各位大侠指点指点?
补充下:
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.{Seconds, StreamingContext} 醉半城 发表于 2017-7-6 14:54
补充下:
import org.apache.kafka.common.serialization.StringDeserializer
i ...
.setMaster("local")master应该不是在本地吧。
topic有多个吗?al topics = Array("test")
下面代码楼主可以对比下
import kafka.api.OffsetRequest
import kafka.message.MessageAndMetadata
import kafka.serializer.StringDecoder
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* Created by wangwei01 on 2017/3/12.
*/
object StreamingReadData {
Logger.getRootLogger.setLevel(Level.WARN)
def main(args: Array): Unit = {
// 创建SparkConf对象,并指定AppName和Master
val conf = new SparkConf()
.setAppName("StreamingReadData")
.setMaster("local")
// 创建StreamingContext对象
val ssc = new StreamingContext(conf, Seconds(10))
// val zkServers = "master:2181,slave1:2181,slave2:2181"
// 注意:需要在本机的hosts文件中添加 master/slave1/slave2对应的ip
val brokers = "master:9092,slave1:9092,slave2:9092"
val topics = "aboutyunlog"
val groupId = "consumer_001"
val topicsSet = topics.split(",").toSet
val kafkaParams = Map(
"metadata.broker.list" -> brokers,
"group.id" -> groupId,
"auto.offset.reset" -> OffsetRequest.SmallestTimeString// 说明每次程序启动,从kafka中最开始的第一条消息开始读取
)
val messages = KafkaUtils.createDirectStream(
ssc, kafkaParams, topicsSet).map(_._2)
messages.print()
ssc.start()
ssc.awaitTermination()
}
}
w123aw 发表于 2017-7-6 15:43
topic有多个吗?al topics = Array("test")
下面代码楼主可以对比下
我用的0.10.0版本。def Subscribe(
topics: Iterable,
kafkaParams: collection.Map): ConsumerStrategy = {
new Subscribe(
new ju.ArrayList(topics.asJavaCollection),
new ju.HashMap(kafkaParams.asJava),
ju.Collections.emptyMap())
}
需要迭代器,所以封装了下。只有一个topic。 感觉应该是环境的问题。
langke93 发表于 2017-7-6 15:37
.setMaster("local")master应该不是在本地吧。
在window下测试的,所以用的local
醉半城 发表于 2017-7-6 16:21
在window下测试的,所以用的local
建议先不封装,使用最简单的,先定位问题。
很可能是封装造成的。
学习了,多多指教 醉半城 发表于 2017-7-6 16:21
在window下测试的,所以用的local
配置文件贴出来看下
发现,清除所有topic后,重新测试就OK了。。。。
页:
[1]
2