分享

spark通过Filter过滤器读取Hbase数据报错

高帝斯法则 发表于 2014-11-3 16:00:11 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 10 66516
本帖最后由 高帝斯法则 于 2014-11-3 16:04 编辑

错误信息如下:
org.apache.hadoop.hbase.DoNotRetryIOException: Failed after retry of OutOfOrderScannerNextException: was there a rpc timeout?
        at org.apache.hadoop.hbase.client.ClientScanner.next(ClientScanner.java:403)
        at org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl.nextKeyValue(TableRecordReaderImpl.java:232)
        at org.apache.hadoop.hbase.mapreduce.TableRecordReader.nextKeyValue(TableRecordReader.java:138)
        at org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:122)
        at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
        at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1014)
        at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:861)
        at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:861)
在google上看到有一哥们跟我的错误一样(地址:http://wefixbugs.com/blog/orgapa ... 7.html#.VFc0YPmUfl8),可是木有人解答,我的问题跟他很相似,也是通过过滤器count一下一共有多少数据。我试过直接count表中所有数据,运行没有问题,但是添加过滤器之后报以上错误,请大神指点迷津

已有(10)人评论

跳转到指定楼层
howtodown 发表于 2014-11-3 17:05:37
scan范围越界了吧。
贴出代码来看看
回复

使用道具 举报

howtodown 发表于 2014-11-3 17:11:18
参考这个篇,看看是否对你有帮助
hbase出现 OutOfOrderScannerNextException



回复

使用道具 举报

高帝斯法则 发表于 2014-11-3 17:14:56
howtodown 发表于 2014-11-3 17:05
scan范围越界了吧。
贴出代码来看看

恩,我试着改下试试,貌似是越界了。
回复

使用道具 举报

高帝斯法则 发表于 2014-11-4 09:35:26
howtodown 发表于 2014-11-3 17:05
scan范围越界了吧。
贴出代码来看看

JavaSparkContext context = new JavaSparkContext(
                                "spark://master1:7077", "SparkHBase");
               
                String tablename = "test";
                Scan scan = new Scan();
                scanFilter(scan);    //过滤器
                conf.set(TableInputFormat.INPUT_TABLE, tablename);


                ClientProtos.Scan proto = ProtobufUtil.toScan(scan);
                String scanStr = Base64.encodeBytes(proto.toByteArray());
                conf.set(TableInputFormat.SCAN, scanStr);
       
                JavaPairRDD<ImmutableBytesWritable, Result> hBaseRDD = context
                                .newAPIHadoopRDD(conf, TableInputFormat.class,
                                                ImmutableBytesWritable.class, Result.class);
               
                Long count = hBaseRDD.count();
                System.out.println("count:"+count+",count duration"+duration_2);
添加红色标注的那部分代码之后就报错了,过滤器的代码是:
FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL);
                SingleColumnValueFilter beginTimeFilter = new SingleColumnValueFilter(Bytes.toBytes(family),
                                                                                                        Bytes.toBytes(addTimeColumn),
                                                                                                        CompareOp.GREATER_OR_EQUAL,
                                                                                                        Bytes.toBytes(Long.parseLong("20140910" + BEGINTIME_OF_DAY)));
                SingleColumnValueFilter endTimeFilter = new SingleColumnValueFilter(Bytes.toBytes(family),
                                Bytes.toBytes(addTimeColumn),
                                CompareOp.LESS_OR_EQUAL,
                                Bytes.toBytes(Long.parseLong("20140910" + ENDTIME_OF_DAY)));
                filterList.addFilter(beginTimeFilter);
                filterList.addFilter(endTimeFilter);
                scan.setFilter(filterList);

一开始是上面的两个时间越界了,后来改成20140910之后时间上没有再越界,但还是报同样的错误,求大神指点
回复

使用道具 举报

howtodown 发表于 2014-11-4 12:11:33
高帝斯法则 发表于 2014-11-4 09:35
JavaSparkContext context = new JavaSparkContext(
                                "spark://master1:7077", "SparkHBase");
               
你存储的时间格式什么样子的,试下下面时间格式
开始时间
20140910

结束时间
20140911
回复

使用道具 举报

高帝斯法则 发表于 2014-11-4 13:43:18
howtodown 发表于 2014-11-4 12:11
你存储的时间格式什么样子的,试下下面时间格式
开始时间
20140910

hbase中存储的时间格式是为20140910010000,过滤器函数中BEGINTIME_OF_DAY=000000,ENDTIME_OF_DAY=235959
回复

使用道具 举报

howtodown 发表于 2014-11-4 14:38:19
高帝斯法则 发表于 2014-11-4 13:43
hbase中存储的时间格式是为20140910010000,过滤器函数中BEGINTIME_OF_DAY=000000,ENDTIME_OF_DAY=23595 ...
改成这个试试:
ENDTIME_OF_DAY=000003
回复

使用道具 举报

高帝斯法则 发表于 2014-11-4 15:01:44
回复

使用道具 举报

howtodown 发表于 2014-11-4 17:48:28
你采用的方式,是自己写的,还是参考的什么文章

采用这种形式把。
scan.setStartRow(Bytes.toBytes("195861-1035177490"));
        scan.setStopRow(Bytes.toBytes("195861-1072173147"));
        scan.addFamily(Bytes.toBytes("info"));
        scan.addColumn(Bytes.toBytes("info"), Bytes.toBytes("levelCode"));


详细参考
spark使用java读取hbase数据做分布式计算

回复

使用道具 举报

12下一页
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条