这个是Java版的基于 spark 1.6 kafka 0.8[mw_shl_code=java,true] import kafka.common.TopicAndPartition;
import kafka.message.MessageAndMetadata;
import kafka.serializer.StringDecoder;
import kafka.utils.ZKGroupTopicDirs;
import org.I0Itec.zkclient.ZkClient;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;
import scala.Tuple2;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
/**
* Created by shengjk1 on 2016/10/8.
* Blog Address:http://blog.csdn.net/jsjsjs1789
*/
public static JavaStreamingContext createContext() {
final SparkConf conf = new SparkConf().setAppName("scan");
final JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(60));
final HiveContext sqlContext = new HiveContext(jssc.sc());
final AtomicReference<OffsetRange[]> offsetRanges = new AtomicReference<>();
//checkpoint失败后会自动重启,会造成数据丢失
//sqlContext.setConf("hive.optimize.ppd", "false");
//jssc.checkpoint("hdfs://centos11:9000/cp");
Map<String, String> kafkaParams = new HashMap<>();
kafkaParams.put("metadata.broker.list", "centos11:9092");
Set<String> topics = new HashSet<>();
topics.add("20161008");
final String zkServer="centos12:2181";
//ZkClient zkClient = new ZkClient(zkServer, 60 * 1000, 60 * 1000);
ZkClient zkClient=new ZkClient(zkServer);
final String topic="20161008";
//kafka groupid
ZKGroupTopicDirs zgt=new ZKGroupTopicDirs("test-consumer-group",topic);
final String zkTopicPath=zgt.consumerOffsetDir();
int countChildren=zkClient.countChildren(zkTopicPath);
Map<TopicAndPartition,Long> fromOffsets=new HashMap<>();
//每一个children都是一个partition
if(countChildren>0){
for (int i = 0; i < countChildren; i++) {
String path=zkTopicPath+"/"+i;
logger.info("========================zk地址 "+path);
String offset=zkClient.readData(path);
TopicAndPartition topicAndPartition=new TopicAndPartition(topic,i);
fromOffsets.put(topicAndPartition,Long.parseLong(offset));
}
KafkaUtils.createDirectStream(
jssc,
String.class,
String.class,
StringDecoder.class,
StringDecoder.class,
String.class,
kafkaParams,
fromOffsets,
new Function<MessageAndMetadata<String, String>, String>() {
public String call(MessageAndMetadata<String, String> v1) throws Exception {
return v1.message();
}
}
).transform(new Function<JavaRDD<String>, JavaRDD<String>>() {
@Override
public JavaRDD<String> call(JavaRDD<String> v1) throws Exception {
OffsetRange[] offsets = ((HasOffsetRanges) v1.rdd()).offsetRanges();
logger.info("=======================offsetsoffsetsoffsetsoffsets "+offsets);
offsetRanges.set(offsets);
return v1;
}
}).foreachRDD(new VoidFunction<JavaRDD<String>>() {
@Override
public void call(JavaRDD<String> stringJavaRDD1) throws Exception {
//业务处理
......
//回写zk
ZkClient zkClient=new ZkClient(zkServer);
OffsetRange[] offsets = offsetRanges.get();
if (null != offsets) {
logger.info("scan ===================zk开始更新 offsets" + offsets.length);
ZKGroupTopicDirs zgt = new ZKGroupTopicDirs("test-consumer-group", topic);
String zkTopicPath = zgt.consumerOffsetDir();
for (OffsetRange o : offsets) {
String zkPath = zkTopicPath + "/" + o.partition();
ZkUtils.updatePersistentPath(zkClient, zkPath, o.untilOffset() + "");
logger.info("scan ===================zk更新完成 path: " + zkPath);
}
zkClient.close();
}
}
});
}else{
KafkaUtils.createDirectStream(jssc,
String.class,
String.class,
StringDecoder.class,
StringDecoder.class,
kafkaParams,
topics)
.transformToPair(new Function<JavaPairRDD<String, String>, JavaPairRDD<String, String>>() {
@Override
public JavaPairRDD<String, String> call(JavaPairRDD<String, String> v1) throws Exception {
OffsetRange[] offsets = ((HasOffsetRanges) v1.rdd()).offsetRanges();
logger.info("=elseelseelseelseelseelse======================offsetsoffsetsoffsetsoffsets "+offsets);
offsetRanges.set(offsets);
return v1;
}
}).map(new Function<Tuple2<String,String>, String>() {
@Override
public String call(Tuple2<String, String> v1) throws Exception {
return v1._2();
}
}).foreachRDD(new VoidFunction<JavaRDD<String>>() {
@Override
public void call(JavaRDD<String> stringJavaRDD) throws Exception {
//业务处理
......
//回写zk
ZkClient zkClient=new ZkClient(zkServer);
OffsetRange[] offsets = offsetRanges.get();
if (null != offsets) {
logger.info("scan ===================zk开始更新 offsets" + offsets.length);
for (OffsetRange o : offsets) {
String zkPath = zkTopicPath + "/" + o.partition();
ZkUtils.updatePersistentPath(zkClient, zkPath, o.untilOffset() + "");
logger.info("scan ===================zk更新完成 path: " + zkPath);
}
zkClient.close();
}
}
});[/mw_shl_code]
最大的一个问题是代码重复率太高,尝试过代码重构,即返回JavaInputDStream和JavaPairDStream的一个父类,但最后失败了,会报一些类型转化错误,也尝试过使用同一个zkClient,这样的话,ZkClient需要final,而org.I0Itec.zkclient.ZkClient包下的zkCLient是没有办法序列化的,有时间的话可以自己写一个zkClient,zkClient部分的重复代码应该可以解决掉。
也参考过scala的相应代码,它之所以没有重复代码,在于scala可以自动判断返回值得类型。
解决代码重复简单粗暴的方法就是进行代码的封装了。
注意:
1.特别是对于SparkStreaming连接kafka仅仅checkpoints也会导致数据丢失,无法保证at only one。此处着重说明一下若是因为spark代码导致的失败,checkpoints可以保证at only one,但若spark代码执行完毕由于插入数据库时程序失败,即使checkpoint也无法保证at only one
2.此版本更新offset处给予kafka-0.8,而kafka-0.9应该这样:
[mw_shl_code=java,true]new ZkUtils(zkClient, null, false).updatePersistentPath(zkPath, o.untilOffset() + "", ZooDefs.Ids.OPEN_ACL_UNSAFE);
[/mw_shl_code]
来自
http://blog.csdn.net/jsjsjs1789/article/details/52823218
|