可以的:针对 Kafka 0.10 提供了 Structured Streaming 集成,以读取数据,并将数据写入 Kafka。
可以从最新或则指定offset开始消费数据
从最新offset开始消费
[mw_shl_code=scala,true]def main(args: Array[String]): Unit = {
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "localhost:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "use_a_separate_group_id_for_each_stream",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val ssc =new StreamingContext(OpContext.sc, Seconds(2))
val topics = Array("test")
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)
stream.foreachRDD(rdd=>{
val offsetRanges=rdd.asInstanceOf[HasOffsetRanges].offsetRanges
rdd.foreachPartition(iter=>{
val o: OffsetRange = offsetRanges(TaskContext.get.partitionId)
println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
})
stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
})
// stream.map(record => (record.key, record.value)).print(1)
ssc.start()
ssc.awaitTermination()
}[/mw_shl_code]
从指定的offset开始消费
[mw_shl_code=scala,true]def main(args: Array[String]): Unit = {
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "localhost:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "use_a_separate_group_id_for_each_stream",
// "auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val ssc = new StreamingContext(OpContext.sc, Seconds(2))
val fromOffsets = Map(new TopicPartition("test", 0) -> 1100449855L)
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Assign[String, String](fromOffsets.keys.toList, kafkaParams, fromOffsets)
)
stream.foreachRDD(rdd => {
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
for (o <- offsetRanges) {
println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
}
stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
})
// stream.map(record => (record.key, record.value)).print(1)
ssc.start()
ssc.awaitTermination()
}[/mw_shl_code]
推荐参考
Structured Streaming + Kafka 集成指南 (Kafka broker 版本 0.10.0 或更高)
http://spark.apachecn.org/docs/c ... ka-integration.html
Spark2.11 两种流操作 + Kafka
http://blog.51cto.com/13064681/1943431
|