JavaSparkContext context = new JavaSparkContext(
"spark://master1:7077", "SparkHBase");
String tablename = "test";
Scan scan = new Scan();
scanFilter(scan); //过滤器
conf.set(TableInputFormat.INPUT_TABLE, tablename);
ClientProtos.Scan proto = ProtobufUtil.toScan(scan);
String scanStr = Base64.encodeBytes(proto.toByteArray());
conf.set(TableInputFormat.SCAN, scanStr);
JavaPairRDD<ImmutableBytesWritable, Result> hBaseRDD = context
.newAPIHadoopRDD(conf, TableInputFormat.class,
ImmutableBytesWritable.class, Result.class);
Long count = hBaseRDD.count();
System.out.println("count:"+count+",count duration"+duration_2);
添加红色标注的那部分代码之后就报错了,过滤器的代码是:
FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL);
SingleColumnValueFilter beginTimeFilter = new SingleColumnValueFilter(Bytes.toBytes(family),
Bytes.toBytes(addTimeColumn),
CompareOp.GREATER_OR_EQUAL,
Bytes.toBytes(Long.parseLong("20140910" + BEGINTIME_OF_DAY)));
SingleColumnValueFilter endTimeFilter = new SingleColumnValueFilter(Bytes.toBytes(family),
Bytes.toBytes(addTimeColumn),
CompareOp.LESS_OR_EQUAL,
Bytes.toBytes(Long.parseLong("20140910" + ENDTIME_OF_DAY)));
filterList.addFilter(beginTimeFilter);
filterList.addFilter(endTimeFilter);
scan.setFilter(filterList);
一开始是上面的两个时间越界了,后来改成20140910之后时间上没有再越界,但还是报同样的错误,求大神指点
|