分享

在线等!spark streaming整合kafka 无法消费到topic数据

醉半城 发表于 2017-7-6 14:51:57 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 13 21970
环境:jar:spark-streaming-kafka-0-10_2.11

spark:2.1.0
kafka:0.10.0
scala:2.11.8

代码如下:[mw_shl_code=scala,true]def main(args: Array[String]) {

    val topics = Array("test")
    val brokers = "master:9092,slave2:9092"

    val sparkConf = new SparkConf().setAppName("demo").setMaster("local")
    val ssc = new StreamingContext(sparkConf, Seconds(2))

    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> brokers,
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "example",
      "auto.offset.reset" -> "latest",
      "enable.auto.commit" -> (false: java.lang.Boolean)
    )

    val stream = KafkaUtils.createDirectStream[String, String](
      ssc,
      PreferConsistent,
      Subscribe[String, String](topics, kafkaParams)
    )

    stream.foreachRDD { rdd =>
      rdd.foreachPartition(
        message => {
          while (message.hasNext) {
            println(message.next())
          }
        })
    }

    ssc.start()
    ssc.awaitTermination()
  }[/mw_shl_code]
使用kafka-console-consumer可以消费到数据。
各位大侠指点指点?

已有(12)人评论

跳转到指定楼层
醉半城 发表于 2017-7-6 14:54:17
补充下:
[mw_shl_code=scala,true]import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.{Seconds, StreamingContext}[/mw_shl_code]
回复

使用道具 举报

langke93 发表于 2017-7-6 15:37:23
醉半城 发表于 2017-7-6 14:54
补充下:
[mw_shl_code=scala,true]import org.apache.kafka.common.serialization.StringDeserializer
i ...

.setMaster("local")master应该不是在本地吧。
回复

使用道具 举报

w123aw 发表于 2017-7-6 15:43:47
topic有多个吗?al topics = Array("test")

下面代码楼主可以对比下
[mw_shl_code=scala,true]import kafka.api.OffsetRequest
import kafka.message.MessageAndMetadata
import kafka.serializer.StringDecoder
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
  * Created by wangwei01 on 2017/3/12.
  */
object StreamingReadData {
  Logger.getRootLogger.setLevel(Level.WARN)
  def main(args: Array[String]): Unit = {

    // 创建SparkConf对象,并指定AppName和Master
    val conf = new SparkConf()
          .setAppName("StreamingReadData")
          .setMaster("local")

    // 创建StreamingContext对象
    val ssc = new StreamingContext(conf, Seconds(10))

//    val zkServers = "master:2181,slave1:2181,slave2:2181"
    // 注意:需要在本机的hosts文件中添加 master/slave1/slave2对应的ip
    val brokers = "master:9092,slave1:9092,slave2:9092"

    val topics = "aboutyunlog"
    val groupId = "consumer_001"

    val topicsSet = topics.split(",").toSet
    val kafkaParams = Map[String, String](
      "metadata.broker.list" -> brokers,
      "group.id" -> groupId,
      "auto.offset.reset" -> OffsetRequest.SmallestTimeString  // 说明每次程序启动,从kafka中最开始的第一条消息开始读取
    )

    val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
      ssc, kafkaParams, topicsSet).map(_._2)

    messages.print()

    ssc.start()
    ssc.awaitTermination()
  }
}
[/mw_shl_code]
回复

使用道具 举报

醉半城 发表于 2017-7-6 16:21:01
w123aw 发表于 2017-7-6 15:43
topic有多个吗?al topics = Array("test")

下面代码楼主可以对比下

我用的0.10.0版本。[mw_shl_code=applescript,true]def Subscribe[K, V](
      topics: Iterable[jl.String],
      kafkaParams: collection.Map[String, Object]): ConsumerStrategy[K, V] = {
    new Subscribe[K, V](
      new ju.ArrayList(topics.asJavaCollection),
      new ju.HashMap[String, Object](kafkaParams.asJava),
      ju.Collections.emptyMap[TopicPartition, jl.Long]())
  }[/mw_shl_code]
需要迭代器,所以封装了下。只有一个topic。 感觉应该是环境的问题。
回复

使用道具 举报

醉半城 发表于 2017-7-6 16:21:54
langke93 发表于 2017-7-6 15:37
.setMaster("local")master应该不是在本地吧。

在window下测试的,所以用的local
回复

使用道具 举报

w123aw 发表于 2017-7-6 16:39:00
醉半城 发表于 2017-7-6 16:21
在window下测试的,所以用的local

建议先不封装,使用最简单的,先定位问题。
很可能是封装造成的。
回复

使用道具 举报

yuntian0215 发表于 2017-7-7 16:12:26
学习了,多多指教
回复

使用道具 举报

nextuser 发表于 2017-7-10 20:01:08
醉半城 发表于 2017-7-6 16:21
在window下测试的,所以用的local

配置文件贴出来看下
回复

使用道具 举报

醉半城 发表于 2017-7-13 17:15:44
发现,清除所有topic后,重新测试就OK了。。。。
回复

使用道具 举报

12下一页
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条