【求意见】如何使用spark streaming接收kafka中发送的自定义对象?

查看数: 69193 | 评论数: 10 | 收藏 1
关灯 | 提示:支持键盘翻页<-左 右->
    组图打开中,请稍候......
发布时间: 2015-5-29 14:40

正文摘要:

大家好,有个问题想咨询下,使用spark streaming无法接收kafka中自定义的对象,具体情况见下: 环境:spark 1.3.1, kafka 0.8.2.0(部署在cdhdatanode2机器),zookeeper 3.4.5(部署在cdhmanager机器) kafka pr ...

回复

Cherise 发表于 2015-6-1 12:12:15
问题解决了,使用文章http://www.iteblog.com/archives/1296中提到的“用ByteArrayOutputStream并结合ObjectOutputStream类将对象转换成Byte数组;并用ByteArrayInputStream结合ObjectInputStream类将Byte数组转换回对象”方法替换了SerializationUtils.serialize和SerializationUtils.deserialize,可以正常接收对象并进行解码了
Cherise 发表于 2015-6-1 09:42:18
Alkaloid0515 发表于 2015-5-30 14:07
楼主确认
/tmp/KafkaTest.jar含有
.com.cherise.spark.streaming.kafka.simpleTest.Student

确实是com.cherise.spark.streaming.kafka.simpleTest.Student呢
Alkaloid0515 发表于 2015-5-30 14:07:06
Cherise 发表于 2015-5-30 11:30
参考了高人的意见,我现在可以正常接收到数据,但是解码的时候出现了新的问题:
15/05/30 11:09:03 ERROR  ...

楼主确认
/tmp/KafkaTest.jar含有
.com.cherise.spark.streaming.kafka.simpleTest.Student
是.simpleTest.Student,而不是Test.Student,

Cherise 发表于 2015-5-30 11:30:00
参考了高人的意见,我现在可以正常接收到数据,但是解码的时候出现了新的问题:
15/05/30 11:09:03 ERROR KafkaReceiver: Error handling message; exiting
org.apache.commons.lang.SerializationException: java.lang.ClassNotFoundException: com.cherise.spark.streaming.kafka.simpleTest.Student
        at org.apache.commons.lang.SerializationUtils.deserialize(SerializationUtils.java:166)
        at org.apache.commons.lang.SerializationUtils.deserialize(SerializationUtils.java:193)
        at com.cherise.spark.streaming.kafka.simpleTest.StudentDecoder.fromBytes(StudentDecoder.java:66)
        at com.cherise.spark.streaming.kafka.simpleTest.StudentDecoder.fromBytes(StudentDecoder.java:1)
        at kafka.message.MessageAndMetadata.message(MessageAndMetadata.scala:32)
        at org.apache.spark.streaming.kafka.KafkaReceiver$MessageHandler.run(KafkaInputDStream.scala:134)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
        at java.util.concurrent.FutureTask.run(FutureTask.java:262)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)

我是使用如下命令提交的任务,而且上面所说的Student.class就在KafkaTest.jar包中
bin/spark-submit --jars /opt/cloudera/parcels/CDH/lib/spark/lib/spark-streaming-kafka-assembly_2.10-1.3.1.jar --class com.cherise.spark.streaming.kafka.simpleTest.KafkaTest /tmp/KafkaTest.jar

并且在构建sparkContext时,使用sparkConf.setJars(new String[] { "/tmp/KafkaTest.jar", "/opt/cloudera/parcels/CDH/lib/spark/lib/spark-streaming-kafka-assembly_2.10-1.3.1.jar"});添加了jar

在Application UI中的Classpath Entries中也看到如下信息
http://192.168.0.53:60922/jars/KafkaTest.jar        Added By User
http://192.168.0.53:60922/jars/s ... mbly_2.10-1.3.1.jar        Added By User

对于上面的问题,大家有什么建议吗?谢谢^_^
Cherise 发表于 2015-5-30 09:13:28
yuwenge 发表于 2015-5-29 18:14
虽然对spark了解不是太多,但是如果想传递对象,都是先序列化为字符串,然后在反序列化

恩,谢谢,主要是参考了http://www.iteblog.com/archives/1296#comment-21881这篇文章中的方法,可是却接收不到……
yuwenge 发表于 2015-5-29 18:14:04
Cherise 发表于 2015-5-29 17:42
是对象,如上面示例中传入的是Student对象:
[mw_shl_code=java,true]public class Student implements  ...

虽然对spark了解不是太多,但是如果想传递对象,都是先序列化为字符串,然后在反序列化
Cherise 发表于 2015-5-29 17:42:14
muyannian 发表于 2015-5-29 17:24
楼主传递的什么??
传递的过程中一般都是以字符串的方式,接受到后在对字符串转换

是对象,如上面示例中传入的是Student对象:
[mw_shl_code=java,true]public class Student implements Serializable{
       
        private String id;
        private String name;
       
        public Student(){
               
        }
       
        public Student(String id, String name){
                this.id = id;
                this.name = name;
        }

        public String getId() {
                return id;
        }

        public void setId(String id) {
                this.id = id;
        }

        public String getName() {
                return name;
        }

        public void setName(String name) {
                this.name = name;
        }
       
       
       
}[/mw_shl_code]
muyannian 发表于 2015-5-29 17:24:59
Cherise 发表于 2015-5-29 17:00
谢谢,但是示例中接收的是string,我是可以正常接收string的。

楼主传递的什么??
传递的过程中一般都是以字符串的方式,接受到后在对字符串转换
Cherise 发表于 2015-5-29 17:00:03
langke93 发表于 2015-5-29 16:11
这个例子跟楼主的比较相似,参考这个试试
spark streaming 接收 kafka 数据示例

谢谢,但是示例中接收的是string,我是可以正常接收string的。
关闭

推荐上一条 /2 下一条