求助:关于spark广播hbase的HConnection报错问题
我要在不通过的task里查询不同条件的hbase数据,所以把HConnection广播出来,但是报错了final Broadcast<HConnection> broadconfiguration= jsc.broadcast(connection);---这行包如下错误
Exception in thread "main" com.esotericsoftware.kryo.KryoException: java.util.ConcurrentModificationException
Serialization trace:
classes (sun.misc.Launcher$AppClassLoader)
classLoader (org.apache.hadoop.conf.Configuration)
conf (org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation)
at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:585)
at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
at org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:158)
at org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:203)
at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:102)
at org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:85)
at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)
at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:63)
at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1318)
at org.apache.spark.api.java.JavaSparkContext.broadcast(JavaSparkContext.scala:648)
at examples.Hbase_demo.main(Hbase_demo.java:381)
我照着网上的方法进行序列化,不知道这里有没有问题:
public class MyRegistrator implements KryoRegistrator {
public void registerClasses(Kryo kryo) {
kryo.register(org.apache.hadoop.hbase.io.ImmutableBytesWritable.class);
kryo.register(org.apache.hadoop.hbase.client.Result.class);
kryo.register(org.apache.hadoop.hbase.client.HConnection.class);
kryo.register(org.apache.hadoop.hbase.client.HConnectionManager.class);
kryo.register(org.apache.hadoop.hbase.client.HConnectionManager.HConnectionImplementation.class);
}
}
代码没有的问题。这里给楼主找一个类似的帖子,希望对楼主有所帮助
###################################################
讲 kafka对象 放到 pool 并通过broadcast广播出去:
然后 在开发测试阶段 报错如下:
然后就找我,说“代码都跟你的差不多呀,为什么就报这个错呢?”
其实 对于广播操作,spark 肯定要序列号的,还有尽量不要把大对象广播出去,
后来 把代码要过来看了下,发现 createKafkaProducerPool这个方法 ,单独创建了一个类,同时这个类 extends Serializable ,我当时的感觉就是,如果createKafkaProducerPool方法 ,写在main方法 or Driver端 应该就肯定不会有这个问题,我也建议这样搞的,还有 我怀疑 集群是启用了Kryo序列号方式,而createKafkaProducerPool方法所在类竟然 extends Serializable ,不解important:The closures (anon function going inside RDD.map(…)) are serialized by Spark before distributing them. Hadoop does not have this problem because it binary-serializes the whole .jar and copies it over the network. Spark uses JavaSerialization by default, but it is very slow compared to, say, Kryo. So we use Kryo to do that by using a wrapper (Spark doesn’t support kryo-serde for closures, not yet).And uptill now the org.dbpedia.extraction.spark.serializeKryoSerializationWrapper class has been working perfectly. Some freak extractors seem to fail though.根据这个错误检索的文章
[*]https://github.com/dbpedia/distributed-extraction-framework/issues/9
[*]http://stackoverflow.com/questions/27277015/sparkcontext-broadcast-jedispool-not-work
[*]http://apache-spark-user-list.1001560.n3.nabble.com/why-does-quot-com-esotericsoftware-kryo-KryoException-java-u-til-ConcurrentModificationException-quo-tc23067.html
#########################################
Java 原生序列化试试
http://www.aboutyun.com/blog-1328-2789.html
谢谢 bob007 和 Alkaloid0515
我试了java原生的序列化,不用kryo。也是不行。
我试了试先把HConnection或者HTableInterface先用kryo或者java序列化成字符串后在进行广播,还是报之前的错。
bob007 发表于 2015-11-16 12:37
Java 原生序列化试试
http://www.aboutyun.com/blog-1328-2789.html
我试了你说的方法。还是不行这行ObjectSerializeUtil.getStrFromObj(ObjectSerializeUtil.java:52)报这个错
java.io.NotSerializableException: org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation。
我觉得根本原因是HConnectionManager$HConnectionImplementation无法被序列化,但是这个类是封装好的,我没法给他加序列化的方法。
我接触java时间也不长,不知道怎么能将封装类序列化。
页:
[1]