分享

Spark写HBase报错

高帝斯法则 发表于 2014-10-22 15:20:40 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 10 31952
通过Spark向HBase添加记录,主要代码如下
                String tableName = "test";               
                conf.set(TableOutputFormat.OUTPUT_TABLE, tableName);
                Put put = new Put(Bytes.toBytes("r1"));
                put.add(Bytes.toBytes("cf1"), Bytes.toBytes("q3"), Bytes.toBytes("13"));
                Tuple2<ImmutableBytesWritable, Put> tuple = new Tuple2<ImmutableBytesWritable, Put>(null, put);
                List<Tuple2<ImmutableBytesWritable, Put>> list = new ArrayList<Tuple2<ImmutableBytesWritable, Put>>();
                list.add(tuple);
                context.parallelizePairs(list).saveAsNewAPIHadoopDataset(conf);

运行之后报一下错误:
ERROR OneForOneStrategy: org.apache.hadoop.hbase.client.Put
java.io.NotSerializableException: org.apache.hadoop.hbase.client.Put
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1164)
        at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1518)
        at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1483)
        at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1400)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1158)
        at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1346)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1154)
        at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1518)
        at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1483)
        at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1400)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1158)
        at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1518)
        at java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:422)
        at org.apache.spark.rdd.ParallelCollectionPartition.writeObject(ParallelCollectionRDD.scala:59)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
        at java.lang.reflect.Method.invoke(Method.java:597)
        at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:940)
HBase源码看了之后,发现put确实没有序列化,Spark操作HBase的方法好难整啊,求大神帮忙

已有(10)人评论

跳转到指定楼层
howtodown 发表于 2014-10-22 15:32:48
可以自己尝试重写put,让它序列化
回复

使用道具 举报

bioger_hit 发表于 2014-10-22 16:06:22
这方面还是比较困难的,给你两个链接,参考下

http://stackoverflow.com/questions/25250774/writing-to-hbase-via-spark-task-not-serializable




http://stackoverflow.com/questions/23619775/spark-serialization-error

回复

使用道具 举报

高帝斯法则 发表于 2014-10-22 16:54:39
回复

使用道具 举报

bioger_hit 发表于 2014-10-22 19:40:24
高帝斯法则 发表于 2014-10-22 16:54
scala没接触过,看不懂啊,有木有Java版的
这个是Java版的,只不过spark接触的不是太深,看不太懂,你或许可以看懂

http://stackoverflow.com/questions/25250774/writing-to-hbase-via-spark-task-not-serializable
回复

使用道具 举报

guxingyu 发表于 2015-3-20 14:45:37
请问楼主,这个问题解决了没有,我也遇到了类似的问题
回复

使用道具 举报

高帝斯法则 发表于 2015-3-20 16:46:39
guxingyu 发表于 2015-3-20 14:45
请问楼主,这个问题解决了没有,我也遇到了类似的问题

解决了,但使用了另外的方式,你遇到的是什么问题?如果是序列化的问题,将相关类序列化就可以了
回复

使用道具 举报

guxingyu 发表于 2015-3-20 17:28:05
ImmutableBytesWritable 该类没有序列化,我自己写了一个类继承它,然后实现序列化接口,但是方法中要传人的就是要这个类,自己序列化的子类压根就不能传入,楼主你是怎么解决的
回复

使用道具 举报

高帝斯法则 发表于 2015-3-23 08:28:11
guxingyu 发表于 2015-3-20 17:28
ImmutableBytesWritable 该类没有序列化,我自己写了一个类继承它,然后实现序列化接口,但是方法中要传人的就 ...

能把代码贴出来看下么?
回复

使用道具 举报

guxingyu 发表于 2015-3-23 15:08:03
JavaSparkContext sc = new JavaSparkContext("local", "hbaseTest","/home/hadoop/spark-1.3.0",new String[0]);
                Configuration conf = HBaseConfiguration.create();
                conf.set("hbase.zookeeper.quorum",
                       "192.168.1.51,192.168.1.52,192.168.1.53,192.168.1.54,192.168.1.55,192.168.1.56,192.168.1.57,192.168.1.58,192.168.1.59");
                conf.set("hbase.zookeeper.property.clientPort", "12181");
                conf.setBoolean("mapred.map.tasks.speculative.execution", false);
                conf.set(TableInputFormat.INPUT_TABLE, "students");
  
            //获得hbase查询结果Result
            JavaPairRDD<ImmutableBytesWritable, Result> hBaseRDD = sc.newAPIHadoopRDD(conf,
                TableInputFormat.class, ImmutableBytesWritable.class,
                Result.class);
            System.out.println(hBaseRDD.first());
回复

使用道具 举报

12下一页
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条