醉半城 发表于 2017-7-6 14:51:57

在线等!spark streaming整合kafka 无法消费到topic数据

环境:jar:spark-streaming-kafka-0-10_2.11

spark:2.1.0
kafka:0.10.0
scala:2.11.8

代码如下:def main(args: Array) {

    val topics = Array("test")
    val brokers = "master:9092,slave2:9092"

    val sparkConf = new SparkConf().setAppName("demo").setMaster("local")
    val ssc = new StreamingContext(sparkConf, Seconds(2))

    val kafkaParams = Map(
      "bootstrap.servers" -> brokers,
      "key.deserializer" -> classOf,
      "value.deserializer" -> classOf,
      "group.id" -> "example",
      "auto.offset.reset" -> "latest",
      "enable.auto.commit" -> (false: java.lang.Boolean)
    )

    val stream = KafkaUtils.createDirectStream(
      ssc,
      PreferConsistent,
      Subscribe(topics, kafkaParams)
    )

    stream.foreachRDD { rdd =>
      rdd.foreachPartition(
      message => {
          while (message.hasNext) {
            println(message.next())
          }
      })
    }

    ssc.start()
    ssc.awaitTermination()
}
使用kafka-console-consumer可以消费到数据。
各位大侠指点指点?

醉半城 发表于 2017-7-6 14:54:17

补充下:
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.{Seconds, StreamingContext}

langke93 发表于 2017-7-6 15:37:23

醉半城 发表于 2017-7-6 14:54
补充下:
import org.apache.kafka.common.serialization.StringDeserializer
i ...

.setMaster("local")master应该不是在本地吧。

w123aw 发表于 2017-7-6 15:43:47

topic有多个吗?al topics = Array("test")

下面代码楼主可以对比下
import kafka.api.OffsetRequest
import kafka.message.MessageAndMetadata
import kafka.serializer.StringDecoder
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
* Created by wangwei01 on 2017/3/12.
*/
object StreamingReadData {
Logger.getRootLogger.setLevel(Level.WARN)
def main(args: Array): Unit = {

    // 创建SparkConf对象,并指定AppName和Master
    val conf = new SparkConf()
          .setAppName("StreamingReadData")
          .setMaster("local")

    // 创建StreamingContext对象
    val ssc = new StreamingContext(conf, Seconds(10))

//    val zkServers = "master:2181,slave1:2181,slave2:2181"
    // 注意:需要在本机的hosts文件中添加 master/slave1/slave2对应的ip
    val brokers = "master:9092,slave1:9092,slave2:9092"

    val topics = "aboutyunlog"
    val groupId = "consumer_001"

    val topicsSet = topics.split(",").toSet
    val kafkaParams = Map(
      "metadata.broker.list" -> brokers,
      "group.id" -> groupId,
      "auto.offset.reset" -> OffsetRequest.SmallestTimeString// 说明每次程序启动,从kafka中最开始的第一条消息开始读取
    )

    val messages = KafkaUtils.createDirectStream(
      ssc, kafkaParams, topicsSet).map(_._2)

    messages.print()

    ssc.start()
    ssc.awaitTermination()
}
}

醉半城 发表于 2017-7-6 16:21:01

w123aw 发表于 2017-7-6 15:43
topic有多个吗?al topics = Array("test")

下面代码楼主可以对比下


我用的0.10.0版本。def Subscribe(
      topics: Iterable,
      kafkaParams: collection.Map): ConsumerStrategy = {
    new Subscribe(
      new ju.ArrayList(topics.asJavaCollection),
      new ju.HashMap(kafkaParams.asJava),
      ju.Collections.emptyMap())
}
需要迭代器,所以封装了下。只有一个topic。 感觉应该是环境的问题。

醉半城 发表于 2017-7-6 16:21:54

langke93 发表于 2017-7-6 15:37
.setMaster("local")master应该不是在本地吧。

在window下测试的,所以用的local

w123aw 发表于 2017-7-6 16:39:00

醉半城 发表于 2017-7-6 16:21
在window下测试的,所以用的local

建议先不封装,使用最简单的,先定位问题。
很可能是封装造成的。

yuntian0215 发表于 2017-7-7 16:12:26

学习了,多多指教

nextuser 发表于 2017-7-10 20:01:08

醉半城 发表于 2017-7-6 16:21
在window下测试的,所以用的local

配置文件贴出来看下

醉半城 发表于 2017-7-13 17:15:44

发现,清除所有topic后,重新测试就OK了。。。。
页: [1] 2
查看完整版本: 在线等!spark streaming整合kafka 无法消费到topic数据