大家好,有个问题想咨询下,使用spark streaming无法接收kafka中自定义的对象,具体情况见下:
环境:spark 1.3.1, kafka 0.8.2.0(部署在cdhdatanode2机器),zookeeper 3.4.5(部署在cdhmanager机器)
kafka producer端代码如下:
[mw_shl_code=java,true]Properties props = new Properties();
props.setProperty("metadata.broker.list", "cdhdatanode2:9092");
props.setProperty("serializer.class", "com.cherise.spark.streaming.kafka.simpleTest.StudentSerializer");
props.setProperty("key.serializer.class", "kafka.serializer.StringEncoder");
props.put("request.required.acks", "1");
ProducerConfig config = new ProducerConfig(props);
Producer<String, Student> producer = new Producer<String, Student>(
config);
Student s = new Student();
s.setId("1212");
s.setName("Lily");
KeyedMessage<String, Student> data = new KeyedMessage<String, Student>(
"kafka", "student", s);
producer.send(data);
producer.close();[/mw_shl_code]
其中Student是自定义的pojo对象,包括两个字段id和name。
StudentSerializer实现了Encoder<Student>,使用SerializationUtils.serialize方法将Student对象转化为byte[]。
[mw_shl_code=java,true]public class StudentSerializer implements Encoder<Student>{
public StudentSerializer(){
}
public StudentSerializer(VerifiableProperties properties) {
}
@Override
public byte[] toBytes(Student s) {
return SerializationUtils.serialize(s);
}
}[/mw_shl_code]
与之对应,下面代码中使用的StudentDecoder实现了Decoder<Student>,使用SerializationUtils.deserialize将byte[]转化为Student
[mw_shl_code=java,true]public class StudentDecoder implements Decoder<Student>{
public StudentDecoder(){
}
public StudentDecoder(VerifiableProperties properties) {
}
@Override
public Student fromBytes(byte[] in) {
return (Student) SerializationUtils.deserialize(in);
}
}[/mw_shl_code]
spark streaming代码如下:
[mw_shl_code=java,true]SparkConf sparkConf = new SparkConf();
sparkConf.setAppName("KafkaTest");
sparkConf.setMaster("spark://cdhnamenode:7077");
sparkConf.set("driver.memory", "512m");
sparkConf.set("spark.executor.memory", "512m");
sparkConf.set("spark.cores.max", "2");
sparkConf.setJars(new String[] { "/tmp/KafkaTest.jar",
"/tmp/kafka/spark-streaming-kafka-assembly_2.10-1.3.1.jar"});
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf,
new Duration(10000));
Map<String, Integer> topicMap = new HashMap<String, Integer>();
// 1表示numThreads
topicMap.put("kafka", 1);
Map<String, String> kafkaParm = new HashMap<String, String>();
kafkaParm.put("zookeeper.connect", "cdhmanager:2181");
kafkaParm.put("group.id", "0"); JavaPairReceiverInputDStream<String, Student> stream = KafkaUtils
.createStream(
jssc,
String.class,
com.cherise.spark.streaming.kafka.simpleTest.Student.class,
kafka.serializer.StringDecoder.class,
com.cherise.spark.streaming.kafka.simpleTest.StudentDecoder.class,
kafkaParm, topicMap, StorageLevels.MEMORY_AND_DISK_2);
JavaDStream<String> value = stream
.map(new Function<Tuple2<String, Student>, String>() {
private static final long serialVersionUID = -8405619055497641802L;
@Override
public String call(Tuple2<String, Student> in)
throws Exception {
return "student:" + in._2().getId() + ": "
+ in._2().getName() + "!";
}
});
value.print();
jssc.start();
jssc.awaitTermination();[/mw_shl_code]
启动spark streaming之后,执行kafka producer端代码,但spark streaming一直接收不到数据,输出的日志中没有“Added input”字样
而如果把spark streaming改为接收String,却可以接收到Kafka Producer端发送的Student对象
[mw_shl_code=java,true]JavaPairReceiverInputDStream<String, String> stream = KafkaUtils
.createStream(jssc, String.class, String.class,
kafka.serializer.StringDecoder.class,
kafka.serializer.StringDecoder.class, kafkaParm,
topicMap, StorageLevels.MEMORY_AND_DISK_2);
JavaDStream<String> value = stream
.map(new Function<Tuple2<String, String>, String>() {
private static final long serialVersionUID = -8405619055497641802L;
@Override
public String call(Tuple2<String, String> in)
throws Exception {
return "student:" + in._2 + "!";
}
});[/mw_shl_code]
接收到的数据输出见图片;
大家知不知道是怎么回事?要怎么解决呢?谢谢
|
|