立即注册 登录
About云-梭伦科技 返回首页

zhanggl的个人空间 https://aboutyun.com/?5862 [收藏] [复制] [分享] [RSS]

日志

spark1.02怎么实现读取hbase的数据

已有 2112 次阅读2015-3-31 15:32 |个人分类:hbase| import

请问spark 怎么实现读取hbase的数据并且展示出来

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.util.Base64;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Serializable;
import scala.Tuple2;
import java.io.IOException;
import java.util.List;
public class SparkFromHbase implements Serializable {

    /**
     * copy from org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil
     *
     * @param scan
     * @return
     * @throws IOException
     */
    String convertScanToString(Scan scan) throws IOException {
        ClientProtos.Scan proto = ProtobufUtil.toScan(scan);
        return Base64.encodeBytes(proto.toByteArray());
    }

    public void start() {
        SparkConf sparkConf = new SparkConf().setAppName("sparkReadHbase")
                .setMaster("spark://master:7077");
       // val conf = sparkConf.
        JavaSparkContext sc = new JavaSparkContext(sparkConf);


        Configuration conf = HBaseConfiguration.create();

        Scan scan = new Scan();
        scan.setStartRow(Bytes.toBytes("20150103"));
        scan.setStopRow(Bytes.toBytes("20150104"));
        try {

            String tableName = "test1";
            conf.set(TableInputFormat.INPUT_TABLE, tableName);
            conf.set(TableInputFormat.SCAN, convertScanToString(scan));


            JavaPairRDD<ImmutableBytesWritable, Result> hBaseRDD = sc.newAPIHadoopRDD(conf,
                    TableInputFormat.class, ImmutableBytesWritable.class,
                    Result.class);
            
            @SuppressWarnings("serial")
JavaPairRDD<String, Integer> levels = hBaseRDD.mapToPair(
                    new PairFunction<Tuple2<ImmutableBytesWritable, Result>, String, Integer>() {
                        @Override
                        public Tuple2<String, Integer> call(Tuple2<ImmutableBytesWritable, Result> immutableBytesWritableResultTuple2) throws Exception {
                            byte[] o = immutableBytesWritableResultTuple2._2().getValue(Bytes.toBytes("cf"), Bytes.toBytes("col_1"));
                            if (o != null) {
                                return new Tuple2<String, Integer>(new String(o), 1);
                            }
                            return null;
                        }
                    });

            @SuppressWarnings("serial")
JavaPairRDD<String, Integer> counts = levels.reduceByKey(
                    new Function2<Integer, Integer, Integer>() {
                        @Override
                        public Integer call(Integer i1, Integer i2) {
                            return i1 + i2;
                        }
                    });

            List<Tuple2<String, Integer>> output = counts.collect();
            for (Tuple2 tuple : output) {
                System.out.println(tuple._1() + ": " + tuple._2());
            }

            sc.stop();

        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        new SparkFromHbase().start();
        System.exit(0);
    }
}

出现异常:
 in thread "main" java.lang.NoSuchMethodError: com.google.common.hash.HashFunction.hashInt(I)Lcom/google/common/hash/HashCode;
at org.apache.spark.util.collection.OpenHashSet.org$apache$spark$util$collection$OpenHashSet$$hashcode(OpenHashSet.scala:261)
at org.apache.spark.util.collection.OpenHashSet$mcI$sp.getPos$mcI$sp(OpenHashSet.scala:165)
at org.apache.spark.util.collection.OpenHashSet$mcI$sp.contains$mcI$sp(OpenHashSet.scala:102)
at org.apache.spark.util.SizeEstimator$$anonfun$visitArray$2.apply$mcVI$sp(SizeEstimator.scala:214)
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
at org.apache.spark.util.SizeEstimator$.visitArray(SizeEstimator.scala:210)
at org.apache.spark.util.SizeEstimator$.visitSingleObject(SizeEstimator.scala:169)
at org.apache.spark.util.SizeEstimator$.org$apache$spark$util$SizeEstimator$$estimate(SizeEstimator.scala:161)
at org.apache.spark.util.SizeEstimator$.estimate(SizeEstimator.scala:155)
at org.apache.spark.storage.MemoryStore.putValues(MemoryStore.scala:75)
at org.apache.spark.storage.MemoryStore.putValues(MemoryStore.scala:92)
at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:661)
at org.apache.spark.storage.BlockManager.put(BlockManager.scala:546)
at org.apache.spark.storage.BlockManager.putSingle(BlockManager.scala:812)
at org.apache.spark.broadcast.HttpBroadcast.<init>(HttpBroadcast.scala:52)
at org.apache.spark.broadcast.HttpBroadcastFactory.newBroadcast(HttpBroadcastFactory.scala:35)
at org.apache.spark.broadcast.HttpBroadcastFactory.newBroadcast(HttpBroadcastFactory.scala:29)
at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62)
at org.apache.spark.SparkContext.broadcast(SparkContext.scala:776)
at org.apache.spark.rdd.NewHadoopRDD.<init>(NewHadoopRDD.scala:72)
at org.apache.spark.SparkContext.newAPIHadoopRDD(SparkContext.scala:643)
at org.apache.spark.api.java.JavaSparkContext.newAPIHadoopRDD(JavaSparkContext.scala:384)
at SparkFromHbase.start(SparkFromHbase.java:70)
at SparkFromHbase.main(SparkFromHbase.java:109)

第70行就是:
 JavaPairRDD<ImmutableBytesWritable, Result> hBaseRDD = sc.newAPIHadoopRDD(conf,
                    TableInputFormat.class, ImmutableBytesWritable.class,
                    Result.class);
请问什么原因

路过

雷人

握手

鲜花

鸡蛋

评论 (0 个评论)

facelist doodle 涂鸦板

您需要登录后才可以评论 登录 | 立即注册

关闭

推荐上一条 /2 下一条