分享

hbase内存溢出的问题。

ld512870 发表于 2015-4-22 21:04:11 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 4 54615
        使用hbase入库,遇到了问题,入库的时候,会出现warn:too many storefile...然后过会就出现toobusyException, memstore > blocking mem....就是大于我们设置的内存。我调整参数,增大了memstore的大小,以及slot数,以及保留的stores数量,以及等待时间。。可是还是会出现这种情况。。这是为什么啊?用那么多mapreduce任务线程入库的时候也没有发现这种情况,为什么单独入库的时候还会这样?还有就是我有三个regionserver,但是入库的时候只是一个reginserver的cpu很高400%,然后其他两个0.。这个应该是因为没有预先创建region分区的原因。。还有就是我用的cloudera manager监控的集群,总是警告说使用了交换区。。。我的swapness也设置成0了。怎么会出现使用交换区的情况呢?还有十几G的内存可以用啊。这个怎么整?

已有(4)人评论

跳转到指定楼层
hyj 发表于 2015-4-22 21:43:28

hbase api常用方法使用及预分区解决热点问题

本帖最后由 hyj 于 2015-4-22 22:00 编辑





下面是一些相关代码,希望对楼主有所帮助
API 操作:
  1. import java.io.IOException;
  2. import java.util.ArrayList;
  3. import java.util.List;
  4. import org.apache.hadoop.conf.Configuration;
  5. import org.apache.hadoop.hbase.HBaseConfiguration;
  6. import org.apache.hadoop.hbase.HColumnDescriptor;
  7. import org.apache.hadoop.hbase.HTableDescriptor;
  8. import org.apache.hadoop.hbase.KeyValue;
  9. import org.apache.hadoop.hbase.MasterNotRunningException;
  10. import org.apache.hadoop.hbase.TableName;
  11. import org.apache.hadoop.hbase.ZooKeeperConnectionException;
  12. import org.apache.hadoop.hbase.client.Get;
  13. import org.apache.hadoop.hbase.client.HBaseAdmin;
  14. import org.apache.hadoop.hbase.client.HTable;
  15. import org.apache.hadoop.hbase.client.HTablePool;
  16. import org.apache.hadoop.hbase.client.Put;
  17. import org.apache.hadoop.hbase.client.Result;
  18. import org.apache.hadoop.hbase.client.ResultScanner;
  19. import org.apache.hadoop.hbase.client.Scan;
  20. import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
  21. import org.apache.hadoop.hbase.filter.Filter;
  22. import org.apache.hadoop.hbase.filter.FilterList;
  23. import org.apache.hadoop.hbase.filter.PrefixFilter;
  24. import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
  25. import org.apache.hadoop.hbase.util.Bytes;
  26. import org.slf4j.Logger;
  27. import org.slf4j.LoggerFactory;
  28. import com.kktest.hbase.HashChoreWoker;
  29. import com.kktest.hbase.HashRowKeyGenerator;
  30. import com.kktest.hbase.RowKeyGenerator;
  31. import com.kktest.hbase.BitUtils;
  32. /**
  33. * hbase 客户端
  34. *
  35. * @author kuang hj
  36. *
  37. */
  38. @SuppressWarnings("all")
  39. public class HBaseClient {
  40.     private static Logger logger = LoggerFactory.getLogger(HBaseClient.class);
  41.     private static Configuration config;
  42.     static {
  43.         config = HBaseConfiguration.create();
  44.         config.set("hbase.zookeeper.quorum",
  45.                 "192.168.1.100:2181,192.168.1.101:2181,192.168.1.103:2181");
  46.     }
  47.     /**
  48.      * 根据随机散列(hash)创建分区表
  49.      *
  50.      * @throws Exception
  51.      *             hash_split_table
  52.      */
  53.     public static void testHashAndCreateTable(String tableNameTmp,
  54.             String columnFamily) throws Exception {<p>        // 取随机散列 10 代表 10个分区
  55.         HashChoreWoker worker = new HashChoreWoker(1000000, 10);
  56.         byte[][] splitKeys = worker.calcSplitKeys();
  57.         HBaseAdmin admin = new HBaseAdmin(config);
  58.         TableName tableName = TableName.valueOf(tableNameTmp);
  59.         if (admin.tableExists(tableName)) {
  60.             try {
  61.                 admin.disableTable(tableName);
  62.             } catch (Exception e) {
  63.             }
  64.             admin.deleteTable(tableName);
  65.         }
  66.         HTableDescriptor tableDesc = new HTableDescriptor(tableName);
  67.         HColumnDescriptor columnDesc = new HColumnDescriptor(
  68.                 Bytes.toBytes(columnFamily));
  69.         columnDesc.setMaxVersions(1);
  70.         tableDesc.addFamily(columnDesc);
  71.         admin.createTable(tableDesc, splitKeys);
  72.         admin.close();
  73.     }
  74.     /**
  75.      * @Title: queryData
  76.      * @Description: 从HBase查询出数据
  77.      * @author kuang hj
  78.      * @param tableName
  79.      *            表名
  80.      * @param rowkey
  81.      *            rowkey
  82.      * @return 返回用户信息的list
  83.      * @throws Exception
  84.      */
  85.     @SuppressWarnings("all")
  86.     public static ArrayList<String> queryData(String tableName, String rowkey)
  87.             throws Exception {
  88.         ArrayList<String> list = new ArrayList<String>();
  89.         logger.info("开始时间");
  90.         HTable table = new HTable(config, tableName);
  91.         Get get = new Get(rowkey.getBytes()); // 根据主键查询
  92.         Result r = table.get(get);
  93.         logger.info("结束时间");
  94.         KeyValue[] kv = r.raw();
  95.         for (int i = 0; i < kv.length; i++) {
  96.             // 循环每一列
  97.             String key = kv[i].getKeyString();
  98.             
  99.             String value = kv[i].getValueArray().toString();
  100.             
  101.             // 将查询到的结果写入List中
  102.             list.add(key + ":"+ value);
  103.             
  104.         }// end of 遍历每一列
  105.         
  106.         return list;
  107.     }
  108.     /**
  109.      * 增加表数据
  110.      *
  111.      * @param tableName
  112.      * @param rowkey
  113.      */
  114.     public static void insertData(String tableName, String rowkey) {
  115.         HTable table = null;
  116.         try {
  117.             table = new HTable(config, tableName);
  118.             // 一个PUT代表一行数据,再NEW一个PUT表示第二行数据,每行一个唯一的ROWKEY,此处rowkey为put构造方法中传入的值
  119.             for (int i = 1; i < 100; i++) {
  120.                 byte[] result = getNumRowkey(rowkey,i);
  121.                 Put put = new Put(result);
  122.                 // 本行数据的第一列
  123.                 put.add(rowkey.getBytes(), "name".getBytes(),
  124.                         ("aaa" + i).getBytes());
  125.                 // 本行数据的第三列
  126.                 put.add(rowkey.getBytes(), "age".getBytes(),
  127.                         ("bbb" + i).getBytes());
  128.                 // 本行数据的第三列
  129.                 put.add(rowkey.getBytes(), "address".getBytes(),
  130.                         ("ccc" + i).getBytes());
  131.                 table.put(put);
  132.             }
  133.         } catch (Exception e1) {
  134.             e1.printStackTrace();
  135.         }
  136.     }
  137.     private static byte[] getNewRowkey(String rowkey) {
  138.         byte[] result = null;
  139.         RowKeyGenerator rkGen = new HashRowKeyGenerator();
  140.         byte[] splitKeys = rkGen.nextId();
  141.         byte[] rowkeytmp = rowkey.getBytes();
  142.         result = new byte[splitKeys.length + rowkeytmp.length];
  143.         System.arraycopy(splitKeys, 0, result, 0, splitKeys.length);
  144.         System.arraycopy(rowkeytmp, 0, result, splitKeys.length,
  145.                 rowkeytmp.length);
  146.         return result;
  147.     }
  148.    
  149.     public static void main(String[] args) {
  150.         RowKeyGenerator rkGen = new HashRowKeyGenerator();
  151.         byte[] splitKeys = rkGen.nextId();
  152.         System.out.println(splitKeys);   
  153.     }
  154.     private static byte[] getNumRowkey(String rowkey, int i) {
  155.         byte[] result = null;
  156.         RowKeyGenerator rkGen = new HashRowKeyGenerator();
  157.         byte[] splitKeys = rkGen.nextId();
  158.         byte[] rowkeytmp = rowkey.getBytes();
  159.         byte[] intVal = BitUtils.getByteByInt(i);
  160.         result = new byte[splitKeys.length + rowkeytmp.length + intVal.length];
  161.         System.arraycopy(splitKeys, 0, result, 0, splitKeys.length);
  162.         System.arraycopy(rowkeytmp, 0, result, splitKeys.length,
  163.                 rowkeytmp.length);
  164.         System.arraycopy(intVal, 0, result, splitKeys.length+rowkeytmp.length,
  165.                 intVal.length);
  166.         return result;
  167.     }
  168.    
  169.    
  170.     /**
  171.      * 删除表
  172.      *
  173.      * @param tableName
  174.      */
  175.     public static void dropTable(String tableName) {
  176.         try {
  177.             HBaseAdmin admin = new HBaseAdmin(config);
  178.             admin.disableTable(tableName);
  179.             admin.deleteTable(tableName);
  180.         } catch (MasterNotRunningException e) {
  181.             e.printStackTrace();
  182.         } catch (ZooKeeperConnectionException e) {
  183.             e.printStackTrace();
  184.         } catch (IOException e) {
  185.             e.printStackTrace();
  186.         }
  187.     }
  188.     /**
  189.      * 查询所有
  190.      *
  191.      * @param tableName
  192.      */
  193.     public static void QueryAll(String tableName) {
  194.         HTable table  = null;
  195.         try {
  196.             table  = new HTable(config, tableName);
  197.             ResultScanner rs = table.getScanner(new Scan());
  198.             for (Result r : rs) {
  199.                 System.out.println("获得到rowkey:" + new String(r.getRow()));
  200.                 for (KeyValue keyValue : r.raw()) {
  201.                     System.out.println("列:" + new String(keyValue.getFamily())
  202.                             + "====值:" + new String(keyValue.getValue()));
  203.                 }
  204.             }
  205.         } catch (IOException e) {
  206.             e.printStackTrace();
  207.         }
  208.     }
  209.     /**
  210.      * 查询所有
  211.      *
  212.      * @param tableName
  213.      */
  214.     public static void QueryByCondition1(String tableName) {
  215.         HTable table = null;
  216.         try {
  217.             table  = new HTable(config, tableName);
  218.             Get scan = new Get("abcdef".getBytes());// 根据rowkey查询
  219.             Result r = table.get(scan);
  220.             System.out.println("获得到rowkey:" + new String(r.getRow()));
  221.             for (KeyValue keyValue : r.raw()) {
  222.                 System.out.println("列:" + new String(keyValue.getFamily())
  223.                         + "====值:" + new String(keyValue.getValue()));
  224.             }
  225.         } catch (IOException e) {
  226.             e.printStackTrace();
  227.         }
  228.     }
  229.    
  230.     /**
  231.      *  根据rowkwy前坠查询
  232.      * @param tableName
  233.      * @param rowkey
  234.      */
  235.     public static void queryByRowKey(String tableName,String rowkey)
  236.     {
  237.         try {
  238.             HTable table = new HTable(config, tableName);
  239.             Scan scan = new Scan();
  240.             scan.setFilter(new PrefixFilter(rowkey.getBytes()));
  241.             ResultScanner rs = table.getScanner(scan);
  242.             KeyValue[] kvs = null;
  243.             for (Result tmp : rs)
  244.             {
  245.                 kvs = tmp.raw();
  246.                 for (KeyValue kv : kvs)
  247.                 {
  248.                     System.out.print(kv.getRow()+" ");
  249.                     System.out.print(kv.getFamily()+" :");
  250.                     System.out.print(kv.getQualifier()+" ");
  251.                     System.out.print(kv.getTimestamp()+" ");
  252.                     System.out.println(kv.getValue());
  253.                 }
  254.             }
  255.         } catch (IOException e) {
  256.             e.printStackTrace();
  257.         }
  258.         
  259.     }
  260.     /**
  261.      * 查询所有
  262.      *
  263.      * @param tableName
  264.      */
  265.     public static void QueryByCondition2(String tableName) {
  266.         try {
  267.             HTable table = new HTable(config, tableName);
  268.             // 当列column1的值为aaa时进行查询
  269.             Filter filter = new SingleColumnValueFilter(
  270.                     Bytes.toBytes("column1"), null, CompareOp.EQUAL,
  271.                     Bytes.toBytes("aaa"));
  272.             Scan s = new Scan();
  273.             s.setFilter(filter);
  274.             ResultScanner rs = table.getScanner(s);
  275.             for (Result r : rs) {
  276.                 System.out.println("获得到rowkey:" + new String(r.getRow()));
  277.                 for (KeyValue keyValue : r.raw()) {
  278.                     System.out.println("列:" + new String(keyValue.getFamily())
  279.                             + "====值:" + new String(keyValue.getValue()));
  280.                 }
  281.             }
  282.         } catch (Exception e) {
  283.             e.printStackTrace();
  284.         }
  285.     }
  286.     /**
  287.      * 查询所有
  288.      *
  289.      * @param tableName
  290.      */
  291.     public static void QueryByCondition3(String tableName) {
  292.         try {
  293.             
  294.             HTable table = new HTable(config, tableName);
  295.             List<Filter> filters = new ArrayList<Filter>();
  296.             Filter filter1 = new SingleColumnValueFilter(
  297.                     Bytes.toBytes("column1"), null, CompareOp.EQUAL,
  298.                     Bytes.toBytes("aaa"));
  299.             filters.add(filter1);
  300.             Filter filter2 = new SingleColumnValueFilter(
  301.                     Bytes.toBytes("column2"), null, CompareOp.EQUAL,
  302.                     Bytes.toBytes("bbb"));
  303.             filters.add(filter2);
  304.             Filter filter3 = new SingleColumnValueFilter(
  305.                     Bytes.toBytes("column3"), null, CompareOp.EQUAL,
  306.                     Bytes.toBytes("ccc"));
  307.             filters.add(filter3);
  308.             FilterList filterList1 = new FilterList(filters);
  309.             Scan scan = new Scan();
  310.             scan.setFilter(filterList1);
  311.             ResultScanner rs = table.getScanner(scan);
  312.             for (Result r : rs) {
  313.                 System.out.println("获得到rowkey:" + new String(r.getRow()));
  314.                 for (KeyValue keyValue : r.raw()) {
  315.                     System.out.println("列:" + new String(keyValue.getFamily())
  316.                             + "====值:" + new String(keyValue.getValue()));
  317.                 }
  318.             }
  319.             rs.close();
  320.         } catch (Exception e) {
  321.             e.printStackTrace();
  322.         }
  323.     }
  324. }
复制代码
  1. HashChoreWoker:
  2. import java.util.Iterator;
  3. import java.util.TreeSet;
  4. import org.apache.hadoop.hbase.util.Bytes;
  5. /**
  6. *
  7. * @author kuang hj
  8. *
  9. */
  10. public class HashChoreWoker{
  11.     // 随机取机数目
  12.     private int baseRecord;
  13.     // rowkey生成器
  14.     private RowKeyGenerator rkGen;
  15.     // 取样时,由取样数目及region数相除所得的数量.
  16.     private int splitKeysBase;
  17.     // splitkeys个数
  18.     private int splitKeysNumber;
  19.     // 由抽样计算出来的splitkeys结果
  20.     private byte[][] splitKeys;
  21.     public HashChoreWoker(int baseRecord, int prepareRegions) {
  22.         this.baseRecord = baseRecord;
  23.         // 实例化rowkey生成器
  24.         rkGen = new HashRowKeyGenerator();
  25.         splitKeysNumber = prepareRegions - 1;
  26.         splitKeysBase = baseRecord / prepareRegions;
  27.     }
  28.     public byte[][] calcSplitKeys() {
  29.         splitKeys = new byte[splitKeysNumber][];
  30.         // 使用treeset保存抽样数据,已排序过
  31.         TreeSet<byte[]> rows = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
  32.         for (int i = 0; i < baseRecord; i++) {
  33.             rows.add(rkGen.nextId());
  34.         }
  35.         int pointer = 0;
  36.         Iterator<byte[]> rowKeyIter = rows.iterator();
  37.         int index = 0;
  38.         while (rowKeyIter.hasNext()) {
  39.             byte[] tempRow = rowKeyIter.next();
  40.             rowKeyIter.remove();
  41.             if ((pointer != 0) && (pointer % splitKeysBase == 0)) {
  42.                 if (index < splitKeysNumber) {
  43.                     splitKeys[index] = tempRow;
  44.                     index++;
  45.                 }
  46.             }
  47.             pointer++;
  48.         }
  49.         rows.clear();
  50.         rows = null;
  51.         return splitKeys;
  52.     }
  53. }
复制代码

  1. HashRowKeyGenerator:
  2. import org.apache.hadoop.hbase.util.Bytes;
  3. import org.apache.hadoop.hbase.util.MD5Hash;
  4. import com.kktest.hbase.BitUtils;
  5. /**
  6. *
  7. *
  8. **/
  9. public class HashRowKeyGenerator implements RowKeyGenerator {
  10.     private static long currentId = 1;
  11.     private static long currentTime = System.currentTimeMillis();
  12.     //private static Random random = new Random();
  13.     public byte[] nextId()
  14.     {
  15.         try {
  16.             currentTime = getRowKeyResult(Long.MAX_VALUE - currentTime);
  17.             byte[] lowT = Bytes.copy(Bytes.toBytes(currentTime), 4, 4);
  18.             byte[] lowU = Bytes.copy(Bytes.toBytes(currentId), 4, 4);
  19.             byte[] result = Bytes.add(MD5Hash.getMD5AsHex(Bytes.add(lowT, lowU))
  20.                     .substring(0, 8).getBytes(), Bytes.toBytes(currentId));
  21.             return result;
  22.         } finally {
  23.             currentId++;
  24.         }
  25.     }
  26.    
  27.     /**
  28.      *  getRowKeyResult
  29.      * @param tmpData
  30.      * @return
  31.      */
  32.     public static long getRowKeyResult(long tmpData)
  33.     {
  34.         String str = String.valueOf(tmpData);
  35.         StringBuffer sb = new StringBuffer();
  36.         char[] charStr = str.toCharArray();
  37.         for (int i = charStr.length -1 ; i > 0; i--)
  38.         {
  39.             sb.append(charStr[i]);
  40.         }
  41.         
  42.         return Long.parseLong(sb.toString());
  43.     }
  44. }
复制代码


回复

使用道具 举报

hyj 发表于 2015-4-22 21:41:26
swapness是不是设置后,重启又恢复了
至于hbase,相信楼主已经知道问题的答案了,剩下的就是去试试了。尽量不要产生热点。





回复

使用道具 举报

NIITYZU 发表于 2015-4-23 09:09:01
hyj 发表于 2015-4-22 21:43
下面是一些相关代码,希望对楼主有所帮助
API 操作:

很强大,谢谢分享
回复

使用道具 举报

ld512870 发表于 2015-4-24 10:18:28
hyj 发表于 2015-4-22 21:41
swapness是不是设置后,重启又恢复了
至于hbase,相信楼主已经知道问题的答案了,剩下的就是去试试了。尽 ...

谢谢版主了。
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条