问题解决了,使用文章http://www.iteblog.com/archives/1296中提到的“用ByteArrayOutputStream并结合ObjectOutputStream类将对象转换成Byte数组;并用ByteArrayInputStream结合ObjectInputStream类将Byte数组转换回对象”方法替换了SerializationUtils.serialize和SerializationUtils.deserialize,可以正常接收对象并进行解码了 |
Alkaloid0515 发表于 2015-5-30 14:07 确实是com.cherise.spark.streaming.kafka.simpleTest.Student呢 |
Cherise 发表于 2015-5-30 11:30 楼主确认 /tmp/KafkaTest.jar含有 .com.cherise.spark.streaming.kafka.simpleTest.Student 是.simpleTest.Student,而不是Test.Student, |
参考了高人的意见,我现在可以正常接收到数据,但是解码的时候出现了新的问题: 15/05/30 11:09:03 ERROR KafkaReceiver: Error handling message; exiting org.apache.commons.lang.SerializationException: java.lang.ClassNotFoundException: com.cherise.spark.streaming.kafka.simpleTest.Student at org.apache.commons.lang.SerializationUtils.deserialize(SerializationUtils.java:166) at org.apache.commons.lang.SerializationUtils.deserialize(SerializationUtils.java:193) at com.cherise.spark.streaming.kafka.simpleTest.StudentDecoder.fromBytes(StudentDecoder.java:66) at com.cherise.spark.streaming.kafka.simpleTest.StudentDecoder.fromBytes(StudentDecoder.java:1) at kafka.message.MessageAndMetadata.message(MessageAndMetadata.scala:32) at org.apache.spark.streaming.kafka.KafkaReceiver$MessageHandler.run(KafkaInputDStream.scala:134) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) at java.util.concurrent.FutureTask.run(FutureTask.java:262) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) 我是使用如下命令提交的任务,而且上面所说的Student.class就在KafkaTest.jar包中 bin/spark-submit --jars /opt/cloudera/parcels/CDH/lib/spark/lib/spark-streaming-kafka-assembly_2.10-1.3.1.jar --class com.cherise.spark.streaming.kafka.simpleTest.KafkaTest /tmp/KafkaTest.jar 并且在构建sparkContext时,使用sparkConf.setJars(new String[] { "/tmp/KafkaTest.jar", "/opt/cloudera/parcels/CDH/lib/spark/lib/spark-streaming-kafka-assembly_2.10-1.3.1.jar"});添加了jar 在Application UI中的Classpath Entries中也看到如下信息 http://192.168.0.53:60922/jars/KafkaTest.jar Added By User http://192.168.0.53:60922/jars/s ... mbly_2.10-1.3.1.jar Added By User 对于上面的问题,大家有什么建议吗?谢谢^_^ |
yuwenge 发表于 2015-5-29 18:14 恩,谢谢,主要是参考了http://www.iteblog.com/archives/1296#comment-21881这篇文章中的方法,可是却接收不到…… |
Cherise 发表于 2015-5-29 17:42 虽然对spark了解不是太多,但是如果想传递对象,都是先序列化为字符串,然后在反序列化 |
muyannian 发表于 2015-5-29 17:24 是对象,如上面示例中传入的是Student对象: [mw_shl_code=java,true]public class Student implements Serializable{ private String id; private String name; public Student(){ } public Student(String id, String name){ this.id = id; this.name = name; } public String getId() { return id; } public void setId(String id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } }[/mw_shl_code] |
Cherise 发表于 2015-5-29 17:00 楼主传递的什么?? 传递的过程中一般都是以字符串的方式,接受到后在对字符串转换 |
langke93 发表于 2015-5-29 16:11 谢谢,但是示例中接收的是string,我是可以正常接收string的。 |