分享

【求意见】如何使用spark streaming接收kafka中发送的自定义对象?

Cherise 发表于 2015-5-29 14:40:10 [显示全部楼层] 只看大图 回帖奖励 阅读模式 关闭右栏 10 69188
大家好,有个问题想咨询下,使用spark streaming无法接收kafka中自定义的对象,具体情况见下:
环境:spark 1.3.1, kafka 0.8.2.0(部署在cdhdatanode2机器),zookeeper 3.4.5(部署在cdhmanager机器)
kafka producer端代码如下:
[mw_shl_code=java,true]Properties props = new Properties();
                props.setProperty("metadata.broker.list", "cdhdatanode2:9092");
                props.setProperty("serializer.class", "com.cherise.spark.streaming.kafka.simpleTest.StudentSerializer");
                props.setProperty("key.serializer.class", "kafka.serializer.StringEncoder");
                props.put("request.required.acks", "1");
                ProducerConfig config = new ProducerConfig(props);
                Producer<String, Student> producer = new Producer<String, Student>(
                                config);
                Student s = new Student();
                s.setId("1212");
                s.setName("Lily");
                KeyedMessage<String, Student> data = new KeyedMessage<String, Student>(
                                "kafka", "student", s);
                producer.send(data);
                producer.close();[/mw_shl_code]
其中Student是自定义的pojo对象,包括两个字段id和name。
StudentSerializer实现了Encoder<Student>,使用SerializationUtils.serialize方法将Student对象转化为byte[]。
[mw_shl_code=java,true]public class StudentSerializer implements Encoder<Student>{
        public StudentSerializer(){
        }
        public StudentSerializer(VerifiableProperties properties) {
        }
@Override
        public byte[] toBytes(Student s) {
                return SerializationUtils.serialize(s);
        }
}[/mw_shl_code]
与之对应,下面代码中使用的StudentDecoder实现了Decoder<Student>,使用SerializationUtils.deserialize将byte[]转化为Student
[mw_shl_code=java,true]public class StudentDecoder implements Decoder<Student>{
        public StudentDecoder(){               
        }        
        public StudentDecoder(VerifiableProperties properties) {            
        }
        @Override
        public Student fromBytes(byte[] in) {
                return (Student) SerializationUtils.deserialize(in);
        }
}[/mw_shl_code]

spark streaming代码如下:
[mw_shl_code=java,true]SparkConf sparkConf = new SparkConf();
                sparkConf.setAppName("KafkaTest");
                sparkConf.setMaster("spark://cdhnamenode:7077");
                sparkConf.set("driver.memory", "512m");
                sparkConf.set("spark.executor.memory", "512m");
                sparkConf.set("spark.cores.max", "2");
                sparkConf.setJars(new String[] { "/tmp/KafkaTest.jar",
                                "/tmp/kafka/spark-streaming-kafka-assembly_2.10-1.3.1.jar"});
                JavaStreamingContext jssc = new JavaStreamingContext(sparkConf,
                                new Duration(10000));

                Map<String, Integer> topicMap = new HashMap<String, Integer>();
                // 1表示numThreads
                topicMap.put("kafka", 1);

                Map<String, String> kafkaParm = new HashMap<String, String>();
                kafkaParm.put("zookeeper.connect", "cdhmanager:2181");
                kafkaParm.put("group.id", "0");                JavaPairReceiverInputDStream<String, Student> stream = KafkaUtils
                                .createStream(
                                                jssc,
                                                String.class,
                                                com.cherise.spark.streaming.kafka.simpleTest.Student.class,
                                                kafka.serializer.StringDecoder.class,
                                                com.cherise.spark.streaming.kafka.simpleTest.StudentDecoder.class,
                                                kafkaParm, topicMap, StorageLevels.MEMORY_AND_DISK_2);
                JavaDStream<String> value = stream
                                .map(new Function<Tuple2<String, Student>, String>() {

                                        private static final long serialVersionUID = -8405619055497641802L;

                                        @Override
                                        public String call(Tuple2<String, Student> in)
                                                        throws Exception {
                                                return "student:" + in._2().getId() + ": "
                                                                + in._2().getName() + "!";
                                        }

                                });
                value.print();
                jssc.start();
                jssc.awaitTermination();[/mw_shl_code]

启动spark streaming之后,执行kafka producer端代码,但spark streaming一直接收不到数据,输出的日志中没有“Added input”字样

而如果把spark streaming改为接收String,却可以接收到Kafka Producer端发送的Student对象
[mw_shl_code=java,true]JavaPairReceiverInputDStream<String, String> stream = KafkaUtils
                                .createStream(jssc, String.class, String.class,
                                                kafka.serializer.StringDecoder.class,
                                                kafka.serializer.StringDecoder.class, kafkaParm,
                                                topicMap, StorageLevels.MEMORY_AND_DISK_2);
                JavaDStream<String> value = stream
                                .map(new Function<Tuple2<String, String>, String>() {

                                        private static final long serialVersionUID = -8405619055497641802L;

                                        @Override
                                        public String call(Tuple2<String, String> in)
                                                        throws Exception {

                                                return "student:" + in._2 + "!";
                                        }

                                });[/mw_shl_code]

接收到的数据输出见图片;
out.png
大家知不知道是怎么回事?要怎么解决呢?谢谢

已有(10)人评论

跳转到指定楼层
langke93 发表于 2015-5-29 16:11:40
这个例子跟楼主的比较相似,参考这个试试
spark streaming 接收 kafka 数据示例
回复

使用道具 举报

Cherise 发表于 2015-5-29 17:00:03
langke93 发表于 2015-5-29 16:11
这个例子跟楼主的比较相似,参考这个试试
spark streaming 接收 kafka 数据示例

谢谢,但是示例中接收的是string,我是可以正常接收string的。
回复

使用道具 举报

muyannian 发表于 2015-5-29 17:24:59
Cherise 发表于 2015-5-29 17:00
谢谢,但是示例中接收的是string,我是可以正常接收string的。

楼主传递的什么??
传递的过程中一般都是以字符串的方式,接受到后在对字符串转换
回复

使用道具 举报

Cherise 发表于 2015-5-29 17:42:14
muyannian 发表于 2015-5-29 17:24
楼主传递的什么??
传递的过程中一般都是以字符串的方式,接受到后在对字符串转换

是对象,如上面示例中传入的是Student对象:
[mw_shl_code=java,true]public class Student implements Serializable{
       
        private String id;
        private String name;
       
        public Student(){
               
        }
       
        public Student(String id, String name){
                this.id = id;
                this.name = name;
        }

        public String getId() {
                return id;
        }

        public void setId(String id) {
                this.id = id;
        }

        public String getName() {
                return name;
        }

        public void setName(String name) {
                this.name = name;
        }
       
       
       
}[/mw_shl_code]
回复

使用道具 举报

yuwenge 发表于 2015-5-29 18:14:04
Cherise 发表于 2015-5-29 17:42
是对象,如上面示例中传入的是Student对象:
[mw_shl_code=java,true]public class Student implements  ...

虽然对spark了解不是太多,但是如果想传递对象,都是先序列化为字符串,然后在反序列化
回复

使用道具 举报

Cherise 发表于 2015-5-30 09:13:28
yuwenge 发表于 2015-5-29 18:14
虽然对spark了解不是太多,但是如果想传递对象,都是先序列化为字符串,然后在反序列化

恩,谢谢,主要是参考了http://www.iteblog.com/archives/1296#comment-21881这篇文章中的方法,可是却接收不到……
回复

使用道具 举报

Cherise 发表于 2015-5-30 11:30:00
参考了高人的意见,我现在可以正常接收到数据,但是解码的时候出现了新的问题:
15/05/30 11:09:03 ERROR KafkaReceiver: Error handling message; exiting
org.apache.commons.lang.SerializationException: java.lang.ClassNotFoundException: com.cherise.spark.streaming.kafka.simpleTest.Student
        at org.apache.commons.lang.SerializationUtils.deserialize(SerializationUtils.java:166)
        at org.apache.commons.lang.SerializationUtils.deserialize(SerializationUtils.java:193)
        at com.cherise.spark.streaming.kafka.simpleTest.StudentDecoder.fromBytes(StudentDecoder.java:66)
        at com.cherise.spark.streaming.kafka.simpleTest.StudentDecoder.fromBytes(StudentDecoder.java:1)
        at kafka.message.MessageAndMetadata.message(MessageAndMetadata.scala:32)
        at org.apache.spark.streaming.kafka.KafkaReceiver$MessageHandler.run(KafkaInputDStream.scala:134)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
        at java.util.concurrent.FutureTask.run(FutureTask.java:262)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)

我是使用如下命令提交的任务,而且上面所说的Student.class就在KafkaTest.jar包中
bin/spark-submit --jars /opt/cloudera/parcels/CDH/lib/spark/lib/spark-streaming-kafka-assembly_2.10-1.3.1.jar --class com.cherise.spark.streaming.kafka.simpleTest.KafkaTest /tmp/KafkaTest.jar

并且在构建sparkContext时,使用sparkConf.setJars(new String[] { "/tmp/KafkaTest.jar", "/opt/cloudera/parcels/CDH/lib/spark/lib/spark-streaming-kafka-assembly_2.10-1.3.1.jar"});添加了jar

在Application UI中的Classpath Entries中也看到如下信息
http://192.168.0.53:60922/jars/KafkaTest.jar        Added By User
http://192.168.0.53:60922/jars/s ... mbly_2.10-1.3.1.jar        Added By User

对于上面的问题,大家有什么建议吗?谢谢^_^
回复

使用道具 举报

Alkaloid0515 发表于 2015-5-30 14:07:06
Cherise 发表于 2015-5-30 11:30
参考了高人的意见,我现在可以正常接收到数据,但是解码的时候出现了新的问题:
15/05/30 11:09:03 ERROR  ...

楼主确认
/tmp/KafkaTest.jar含有
.com.cherise.spark.streaming.kafka.simpleTest.Student
是.simpleTest.Student,而不是Test.Student,

回复

使用道具 举报

Cherise 发表于 2015-6-1 09:42:18
Alkaloid0515 发表于 2015-5-30 14:07
楼主确认
/tmp/KafkaTest.jar含有
.com.cherise.spark.streaming.kafka.simpleTest.Student

确实是com.cherise.spark.streaming.kafka.simpleTest.Student呢
回复

使用道具 举报

12下一页
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条