scala代码
package org.apache.spark.test
import kafka.serializer.StringDecoder
import kafka.common.TopicAndPartition
import kafka.message.MessageAndMetadata
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
import org.apache.spark.SparkConf
import org.apache.spark.TaskContext
import org.apache.spark.rdd.RDD
object TestDirectKafka2 {
def main(args: Array[String]) {
val args2 = Array[String]("192.168.100.130:9091,192.168.100.130:9092", "test")
if (args2.length < 2) {
System.err.println("")
System.exit(1)
}
val Array(brokers, topics) = args2
val topicSet = Set(topics.split(","): _*)
val groupId = "group1";
val sparkConf = new SparkConf().setAppName("DirectKafkaWordCount").setMaster("local[1]")
sparkConf.set("spark.streaming.kafka.maxRatePerPartition", "1");
// 优雅的停止 kill -15 pid
sparkConf.set("spark.streaming.stopGracefullyOnShutdown", "true");
// on yarn模式 时 该设置 driver程序只执行一次,不会失败重试
sparkConf.set("spark.yarn.maxAppAttempts", "1");
val ssc = new StreamingContext(sparkConf, Seconds(2))
val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers, "group.id" -> groupId)
val messageHandler = (mmd: MessageAndMetadata[String, String]) => (mmd)
val kc = new KafkaCluster(kafkaParams);
//读取offset
val fromOffsets: Map[TopicAndPartition, Long] = getOffset(kc, topicSet, groupId)
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, MessageAndMetadata[String, String]](ssc, kafkaParams, fromOffsets, messageHandler)
messages.foreachRDD(rdd => {
//业务处理
rdd.foreachPartition(iter => { println(iter.foreach(one => { println(one.topic + "," + one.partition + "," + one.offset + "," + one.message()) })) })
//保存kafka偏移量
var offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
saveOffset(kc, offsetRanges, groupId)
});
ssc.start()
ssc.awaitTermination()
}
def saveOffset(kc: KafkaCluster, offsetRanges: Array[OffsetRange], groupId: String): Unit = {
var fromOffsets = Map[TopicAndPartition, Long]();
offsetRanges.foreach { offsetRange => {
fromOffsets += (offsetRange.topicAndPartition() -> offsetRange.untilOffset);
}
}
saveOffset(kc, fromOffsets, groupId)
}
def saveOffset(kc: KafkaCluster, fromOffsets: Map[TopicAndPartition, Long], groupId: String): Unit = {
kc.setConsumerOffsets(groupId, fromOffsets);
}
/**
* 读取offset
* 判断offset是否已被kafka清理
*/
def getOffset(kc: KafkaCluster, topics: Set[String], groupId: String): Map[TopicAndPartition, Long] = {
val kafkaPartitionsISet = kc.getPartitions(topics).right.get;
val consumerOffsetsE = kc.getConsumerOffsets(groupId, kafkaPartitionsISet);
var fromOffsets = Map[TopicAndPartition, Long]();
var consumerOffsets:Map[TopicAndPartition, Long] = null;
if (consumerOffsetsE.isRight) {
consumerOffsets = consumerOffsetsE.right.get;
}
val earliestLeaderOffsets = kc.getEarliestLeaderOffsets(kafkaPartitionsISet).right.get;
earliestLeaderOffsets.foreach((topicAndPartitionEntry)=>{
val k = topicAndPartitionEntry._1
val leaderOffsetL = topicAndPartitionEntry._2.offset
if (consumerOffsets != null) {
val oValue = consumerOffsets.get(k);
val offsetObj = oValue.getOrElse(-1L);
val offset = Math.max(offsetObj, leaderOffsetL)
fromOffsets += (k->offset)
}
})
fromOffsets
}
}
java代码
package liu.yuze.spark.streaming;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import kafka.common.TopicAndPartition;
import kafka.message.MessageAndMetadata;
import kafka.serializer.StringDecoder;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.HasOffsetRanges;
import org.apache.spark.streaming.kafka.KafkaCluster;
import org.apache.spark.streaming.kafka.KafkaCluster.LeaderOffset;
import org.apache.spark.streaming.kafka.KafkaUtils;
import org.apache.spark.streaming.kafka.OffsetRange;
import org.apache.spark.test.JavaScalaConverter;
import scala.Option;
import scala.collection.mutable.ArrayBuffer;
import scala.util.Either;
public class TestDirectKafka2 {
public static void main(String[] args) {
args = new String[] { "192.168.100.130:9091,192.168.100.130:9092", "test" };
if (args.length < 2) {
System.err.println("输入参数错误");
System.exit(1);
}
boolean log4jInitialized = Logger.getRootLogger().getAllAppenders().hasMoreElements();
if (!log4jInitialized) {
Logger.getRootLogger().setLevel(Level.WARN);
}
String brokers = args[0];
String topics = args[1];
final String groupId = "group1";
SparkConf sparkConf = new SparkConf().setAppName("JavaDirectKafkaWordCount").setMaster("local");
sparkConf.set("spark.streaming.kafka.maxRatePerPartition", "1");
// 优雅的停止 kill -15 pid
sparkConf.set("spark.streaming.stopGracefullyOnShutdown", "true");
// on yarn模式 时 该设置 driver程序只执行一次,不会失败重试
sparkConf.set("spark.yarn.maxAppAttempts", "1");
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(2));
final HashMap<String, String> kafkaParams = new HashMap<>();
kafkaParams.put("metadata.broker.list", brokers);
kafkaParams.put("group.id", groupId);
// 获取topic partition 和 偏移量
Set<String> topicSet = new HashSet<String>(Arrays.asList(topics.split(",")));
scala.collection.immutable.Map<String, String> kafkaParamsIMap = JavaScalaConverter.convertAsIMap(kafkaParams);
final KafkaCluster kc = new KafkaCluster(kafkaParamsIMap);
HashMap<TopicAndPartition, Long> fromOffsets = getOffset(kc, topicSet, groupId);
Function<MessageAndMetadata<String, String>, MessageAndMetadata> function = new Function<MessageAndMetadata<String, String>, MessageAndMetadata>() {
private static final long serialVersionUID = 1L;
@Override
public MessageAndMetadata<String, String> call(MessageAndMetadata<String, String> v1) throws Exception {
return v1;
}
};
JavaInputDStream<MessageAndMetadata> createDirectStream2 = KafkaUtils.createDirectStream(jssc, String.class, String.class, StringDecoder.class, StringDecoder.class, MessageAndMetadata.class,
kafkaParams, fromOffsets, function);
createDirectStream2.foreachRDD(new VoidFunction<JavaRDD<MessageAndMetadata>>() {
@Override
public void call(JavaRDD<MessageAndMetadata> rdd) throws Exception {
rdd.foreachPartition(new VoidFunction<Iterator<MessageAndMetadata>>() {
@Override
public void call(Iterator<MessageAndMetadata> t2) throws Exception {
while (t2.hasNext()) {
MessageAndMetadata next = t2.next();
System.out.println(next);
}
}
});
OffsetRange[] offsetRanges = ((HasOffsetRanges) JavaRDD.toRDD(rdd)).offsetRanges();
saveOffset(kc, offsetRanges, groupId);
}
});
jssc.start();
jssc.awaitTermination();
}
public static void saveOffset(KafkaCluster kc, OffsetRange[] offsetRanges, String groupId) {
HashMap<TopicAndPartition, Object> fromOffsets = new HashMap<>();
for (OffsetRange offsetRange : offsetRanges) {
fromOffsets.put(offsetRange.topicAndPartition(), offsetRange.untilOffset());
}
saveOffset(kc, fromOffsets, groupId);
}
public static void saveOffset(KafkaCluster kc, Map<TopicAndPartition, Object> fromOffsets, String groupId) {
scala.collection.immutable.Map<TopicAndPartition, Object> convertAsIMap = JavaScalaConverter.convertAsIMap(fromOffsets);
kc.setConsumerOffsets(groupId, convertAsIMap);
}
public static HashMap<TopicAndPartition, Long> getOffset(KafkaCluster kc, Set<String> topicSet, String groupId) {
scala.collection.immutable.Set<String> topicISet = JavaScalaConverter.convertAsISet(topicSet);
scala.collection.immutable.Set<TopicAndPartition> kafkaPartitionsISet = kc.getPartitions(topicISet).right().get();
Either<ArrayBuffer<Throwable>, scala.collection.immutable.Map<TopicAndPartition, Object>> consumerOffsetsE = kc.getConsumerOffsets(groupId, kafkaPartitionsISet);
Set<TopicAndPartition> kafkaPartitionsJSet = JavaScalaConverter.convertAsJSet(kafkaPartitionsISet);
HashMap<TopicAndPartition, Long> fromOffsets = new HashMap<>();
scala.collection.immutable.Map<TopicAndPartition, Object> consumerOffsets = null;
if (consumerOffsetsE.isRight()) {
consumerOffsets = consumerOffsetsE.right().get();
}
scala.collection.immutable.Map<TopicAndPartition, LeaderOffset> earliestLeaderOffsets = kc.getEarliestLeaderOffsets(kafkaPartitionsISet).right().get();
for (TopicAndPartition topicAndPartition : kafkaPartitionsJSet) {
long leaderOffsetL = -1;
Option<LeaderOffset> leaderOffsetO = earliestLeaderOffsets.get(topicAndPartition);
LeaderOffset leaderOffset = leaderOffsetO.getOrElse(null);
if (leaderOffset != null) {
leaderOffsetL = leaderOffset.offset();
long zkOffset = -1;
if (consumerOffsets != null) {
Option<Object> oValue = consumerOffsets.get(topicAndPartition);
Object offsetObj = oValue.getOrElse(null);
if (offsetObj != null) {
zkOffset = (Long) offsetObj;
}
}
Long offset = Math.max(leaderOffsetL, zkOffset);
fromOffsets.put(topicAndPartition, offset);
} else {
fromOffsets.put(topicAndPartition, 0L);
}
}
return fromOffsets;
}
}
package org.apache.spark.test
object JavaScalaConverter {
def main(args: Array[String]): Unit = {
println("dddd")
}
def convertAsIMap[K, V](jmap: java.util.Map[K, V]): collection.immutable.Map[K, V] = {
var mMap = collection.JavaConversions.mapAsScalaMap(jmap)
val iMap = collection.immutable.Map(mMap.toSeq: _*)
iMap
}
def convertAsISet[K](jset: java.util.Set[K]): collection.immutable.Set[K] = {
var mset = collection.JavaConversions.asScalaSet(jset)
val iset = collection.immutable.Set(mset.toSeq: _*)
iset
}
def convertAsJSet[K](iSet: collection.immutable.Set[K]): java.util.Set[K] = {
var jset = collection.JavaConversions.setAsJavaSet(iSet)
jset
}
}
|
|