分享

spark使用java读取hbase数据做分布式计算

rsgg03 2014-6-26 11:48:47 发表于 实操演练 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 5 92824
问题导读:
1.如何初始化sparkContext?
2.如何设置查询条件?
3.如何获得hbase查询结果Result?






由于spark提供的hbaseTest是scala版本,并没有提供java版。我将scala版本改为java版本,并根据数据做了些计算操作。

程序目的:查询出hbase满足条件的用户,统计各个等级个数。

代码如下,西面使用的hbase是0.94注释已经写详细:


  1. package com.sdyc.ndspark.sys;
  2. import org.apache.commons.logging.Log;
  3. import org.apache.commons.logging.LogFactory;
  4. import org.apache.hadoop.conf.Configuration;
  5. import org.apache.hadoop.hbase.HBaseConfiguration;
  6. import org.apache.hadoop.hbase.client.Result;
  7. import org.apache.hadoop.hbase.client.Scan;
  8. import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
  9. import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
  10. import org.apache.hadoop.hbase.util.Base64;
  11. import org.apache.hadoop.hbase.util.Bytes;
  12. import org.apache.spark.api.java.JavaPairRDD;
  13. import org.apache.spark.api.java.JavaSparkContext;
  14. import org.apache.spark.api.java.function.Function2;
  15. import org.apache.spark.api.java.function.PairFunction;
  16. import scala.Tuple2;
  17. import java.io.ByteArrayOutputStream;
  18. import java.io.DataOutputStream;
  19. import java.io.IOException;
  20. import java.io.Serializable;
  21. import java.util.List;
  22. /**
  23. * <pre>
  24. *
  25. * spark hbase 测试
  26. *
  27. *  Created with IntelliJ IDEA.
  28. * User: zhangdonghao
  29. * Date: 14-1-26
  30. * Time: 上午9:24
  31. * To change this template use File | Settings | File Templates.
  32. * </pre>
  33. *
  34. * @author zhangdonghao
  35. */
  36. public class HbaseTest implements Serializable {
  37.     public Log log = LogFactory.getLog(HbaseTest.class);
  38.     /**
  39.      * 将scan编码,该方法copy自 org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil
  40.      *
  41.      * @param scan
  42.      * @return
  43.      * @throws IOException
  44.      */
  45.     static String convertScanToString(Scan scan) throws IOException {
  46.         ByteArrayOutputStream out = new ByteArrayOutputStream();
  47.         DataOutputStream dos = new DataOutputStream(out);
  48.         scan.write(dos);
  49.         return Base64.encodeBytes(out.toByteArray());
  50.     }
  51.     public void start() {
  52.         //初始化sparkContext,这里必须在jars参数里面放上Hbase的jar,
  53.         // 否则会报unread block data异常
  54.         JavaSparkContext sc = new JavaSparkContext("spark://nowledgedata-n3:7077", "hbaseTest",
  55.                 "/home/hadoop/software/spark-0.8.1",
  56.                 new String[]{"target/ndspark.jar", "target\\dependency\\hbase-0.94.6.jar"});
  57.         //使用HBaseConfiguration.create()生成Configuration
  58.         // 必须在项目classpath下放上hadoop以及hbase的配置文件。
  59.         Configuration conf = HBaseConfiguration.create();
  60.         //设置查询条件,这里值返回用户的等级
  61.         Scan scan = new Scan();
  62.         scan.setStartRow(Bytes.toBytes("195861-1035177490"));
  63.         scan.setStopRow(Bytes.toBytes("195861-1072173147"));
  64.         scan.addFamily(Bytes.toBytes("info"));
  65.         scan.addColumn(Bytes.toBytes("info"), Bytes.toBytes("levelCode"));
  66.         try {
  67.             //需要读取的hbase表名
  68.             String tableName = "usertable";
  69.             conf.set(TableInputFormat.INPUT_TABLE, tableName);
  70.             conf.set(TableInputFormat.SCAN, convertScanToString(scan));
  71.             //获得hbase查询结果Result
  72.             JavaPairRDD<ImmutableBytesWritable, Result> hBaseRDD = sc.newAPIHadoopRDD(conf,
  73.                     TableInputFormat.class, ImmutableBytesWritable.class,
  74.                     Result.class);
  75.             //从result中取出用户的等级,并且每一个算一次
  76.             JavaPairRDD<Integer, Integer> levels = hBaseRDD.map(
  77.                     new PairFunction<Tuple2<ImmutableBytesWritable, Result>, Integer, Integer>() {
  78.                         @Override
  79.                         public Tuple2<Integer, Integer> call(
  80.                                 Tuple2<ImmutableBytesWritable, Result> immutableBytesWritableResultTuple2)
  81.                                 throws Exception {
  82.                             byte[] o = immutableBytesWritableResultTuple2._2().getValue(
  83.                                     Bytes.toBytes("info"), Bytes.toBytes("levelCode"));
  84.                             if (o != null) {
  85.                                 return new Tuple2<Integer, Integer>(Bytes.toInt(o), 1);
  86.                             }
  87.                             return null;
  88.                         }
  89.                     });
  90.             //数据累加
  91.             JavaPairRDD<Integer, Integer> counts = levels.reduceByKey(new Function2<Integer, Integer, Integer>() {
  92.                 public Integer call(Integer i1, Integer i2) {
  93.                     return i1 + i2;
  94.                 }
  95.             });
  96.             
  97.             //打印出最终结果
  98.             List<Tuple2<Integer, Integer>> output = counts.collect();
  99.             for (Tuple2 tuple : output) {
  100.                 System.out.println(tuple._1 + ": " + tuple._2);
  101.             }
  102.         } catch (Exception e) {
  103.             log.warn(e);
  104.         }
  105.     }
  106.     /**
  107.      * spark如果计算没写在main里面,实现的类必须继承Serializable接口,<br>
  108.      * </>否则会报 Task not serializable: java.io.NotSerializableException 异常
  109.      */
  110.     public static void main(String[] args) throws InterruptedException {
  111.         new HbaseTest().start();
  112.         System.exit(0);
  113.     }
  114. }
复制代码
注意:如果使用的是hbase0.96.1.1-hadoop2

convertScanToString函数需要改为:


  1. /**
  2. * 将scan编码,该方法copy自 org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil
  3. *
  4. * @param scan
  5. * @return
  6. * @throws IOException
  7. */
  8. static String convertScanToString(Scan scan) throws IOException {
  9.     ClientProtos.Scan proto = ProtobufUtil.toScan(scan);
  10.     return Base64.encodeBytes(proto.toByteArray());
  11. }
复制代码

运行结果如下:

  1. 0: 28528
  2. 11: 708
  3. 4: 28656
  4. 2: 36315
  5. 6: 23848
  6. 8: 19802
  7. 10: 6913
  8. 9: 15988
  9. 3: 31950
  10. 1: 38872
  11. 7: 21600
  12. 5: 27190
  13. 12: 17
复制代码









作者:张东昊
http://my.oschina.net/132722/blog/196350


欢迎大家如about云官方群371358502,更新咨询,更新资源,随时关注

已有(5)人评论

跳转到指定楼层
guxingyu 发表于 2015-3-20 14:57:14
楼主,我用上面的代码运行,出下面的错误啊
java.io.NotSerializableException: org.apache.hadoop.hbase.io.ImmutableBytesWritable
ImmutableBytesWritable该类没有序列化,这个要怎么才能序列化啊
回复

使用道具 举报

zhanggl 发表于 2015-3-31 11:06:21
楼主你确定代码能运行?
第86行:JavaPairRDD<Integer, Integer> levels = hBaseRDD.map(
有问题吧
回复

使用道具 举报

lyxing 发表于 2015-12-25 10:42:53
86行我也报错:
回复

使用道具 举报

chimes298 发表于 2016-3-8 14:53:01
guxingyu 发表于 2015-3-20 14:57
楼主,我用上面的代码运行,出下面的错误啊
java.io.NotSerializableException: org.apache.hadoop.hbase.io ...

启动配置加--conf spark.serializer=org.apache.spark.serializer.KryoSerializer试试
回复

使用道具 举报

ml32 发表于 2016-3-23 20:33:34
spark使用java读取hbase数据做分布式计算
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条