分享

问个问题,怎么利用mapreduce框架实现:利用HBase的2个表计算得出结果

pig2 发表于 2013-11-8 22:03:58 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 1 5046
问个问题,怎么利用mapreduce框架实现:利用HBase2个表计算得出结果
比如举个例子  HBase中的两个表 你想要具体得到什么, 我想计算着2个表的乘积, 2个表存储的是2个矩阵, 请大神指教, 就一个列数,下面有n个标签

已有(1)人评论

跳转到指定楼层
rsgg03 发表于 2013-11-8 22:20:49
你可以参考下面的代码
  1. import java.io.IOException;
  2. import java.io.ByteArrayOutputStream;
  3. import java.io.DataOutputStream;
  4. import java.io.ByteArrayInputStream;
  5. import java.io.DataInputStream;
  6. import java.util.Map;
  7. import org.apache.hadoop.io.Writable;
  8. import org.apache.hadoop.io.IntWritable;
  9. import org.apache.hadoop.hbase.HBaseConfiguration;
  10. import org.apache.hadoop.hbase.HTableDescriptor;
  11. import org.apache.hadoop.hbase.HColumnDescriptor;
  12. import org.apache.hadoop.hbase.client.HBaseAdmin;
  13. import org.apache.hadoop.hbase.client.HTable;
  14. import org.apache.hadoop.hbase.io.BatchUpdate;
  15. import org.apache.hadoop.hbase.io.RowResult;
  16. import org.apache.hadoop.hbase.io.Cell;
  17. import org.apache.hadoop.hbase.util.Writables;
  18. public class HBaseBasic {
  19. public static void main(String[] args) throws Exception {
  20. HBaseConfiguration config = new HBaseConfiguration();
  21. HBaseAdmin admin = new HBaseAdmin(config);
  22. if (admin.tableExists("scores")) {
  23. System.out.println("drop table");
  24. admin.disableTable("scores");
  25. admin.deleteTable("scores");
  26. }
  27. System.out.println("create table");
  28. HTableDescriptor tableDescripter = new HTableDescriptor("scores".getBytes());
  29. tableDescripter.addFamily(new HColumnDescriptor("grade:"));
  30. tableDescripter.addFamily(new HColumnDescriptor("course:"));
  31. admin.createTable(tableDescripter);
  32. HTable table = new HTable(config, "scores");
  33. System.out.println("add Tom's data");
  34. BatchUpdate tomUpdate = new BatchUpdate("Tom");
  35. tomUpdate.put("grade:", Writables.getBytes(new IntWritable(1)));
  36. tomUpdate.put("course:math", Writables.getBytes(new IntWritable(87)));
  37. tomUpdate.put("course:art", Writables.getBytes(new IntWritable(97)));
  38. table.commit(tomUpdate);
  39. System.out.println("add Jerry's data");
  40. BatchUpdate jerryUpdate = new BatchUpdate("Jerry");
  41. jerryUpdate.put("grade:", Writables.getBytes(new IntWritable(2)));
  42. jerryUpdate.put("course:math", Writables.getBytes(new IntWritable(100)));
  43. jerryUpdate.put("course:art", Writables.getBytes(new IntWritable(80)));
  44. table.commit(jerryUpdate);
  45. for (RowResult row : table.getScanner(new String[] { "course:" })) {
  46. System.out.format("ROW\t%s\n", new String(row.getRow()));
  47. for (Map.Entry<byte[], Cell> entry : row.entrySet()) {
  48. String column = new String(entry.getKey());
  49. Cell cell = entry.getValue();
  50. IntWritable value = new IntWritable();
  51. Writables.copyWritable(cell.getValue(), value);
  52. System.out.format(" COLUMN\t%s\t%d\n", column, value.get());
  53. }
  54. }
  55. }
  56. }
  57. <DIV class=blockcode>
  58. <BLOCKQUOTE>/**
  59. * 向指定的表插入单个Put对象
  60. *
  61. * @param tablename
  62. * @param conf
  63. * @throws Exception
  64. */
  65. public static void insertData(String tableName, HBaseConfiguration conf) {
  66. HTable table = null;
  67. try {
  68. if (table == null) {
  69. table = new HTable(conf, tableName);
  70. }
  71. // 这里我使用time+6位随机数为row关键字,确保不重复
  72. String rowname = System.currentTimeMillis() / 1000 + "" + CommUtil.getSixRadom();
  73. System.out.println("rowname = " + rowname);
  74. Put p = new Put(Bytes.toBytes(rowname));
  75. p.add("acc".getBytes(), new Long(System.currentTimeMillis()).longValue(), "大绝招".getBytes());
  76. p.add("pwd".getBytes(), new Long(System.currentTimeMillis()).longValue(), "123456".getBytes());
  77. p.add("sex".getBytes(), new Long(System.currentTimeMillis()).longValue(), "1".getBytes());
  78. p.add("age".getBytes(), new Long(System.currentTimeMillis()).longValue(), "2222".getBytes());
  79. table.put(p);
  80. } catch (Exception e) {
  81. e.printStackTrace();
  82. } finally {
  83. CommUtil.HBaseClose(table);
  84. }
  85. }
复制代码
  1. public class InsertDataToHBase {
  2. public static class InsertDataToHBaseMapper extends Mapper<Object, Text, NullContext, NullWritable> {
  3. public static String table1[] = { "field1", "field2", "field3"};
  4. public static String table2[] = { "field1", "field2", "field3"};
  5. public static String table3[] = { "field1", "field2", "field3"};
  6. public static HTable table = null;
  7. protected void setup(Context context) throws IOException, InterruptedException {
  8. HBaseConfiguration conf = new HBaseConfiguration();
  9. String table_name = context.getConfiguration().get("tabel_name");
  10. if (table == null) {
  11. table = new HTable(conf, table_name);
  12. }
  13. }
  14. public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
  15. String arr_value[] = value.toString().split("\t");
  16. String table_name = context.getConfiguration().get("tabel_name");
  17. String temp_arr[] = table1;
  18. int temp_value_length = 0;
  19. if (table_name.trim().equals("table1")) {
  20. temp_arr = table1;
  21. temp_value_length = 3;
  22. } else if (table_name.trim().equals("table2")) {
  23. temp_arr = table2;
  24. temp_value_length = 3;
  25. } else if (table_name.trim().equals("table3")) {
  26. temp_arr = table3;
  27. temp_value_length = 3;
  28. }
  29. List<Put> list = new ArrayList<Put>();
  30. if (arr_value.length == temp_value_length) {
  31. String rowname = System.currentTimeMillis() / 1000 + "" + CommUtil.getSixRadom();
  32. Put p = new Put(Bytes.toBytes(rowname));
  33. for (int i = 0; i < temp_arr.length; i++) {
  34. p.add(temp_arr[i].getBytes(), "".getBytes(), arr_value[i].getBytes());
  35. }
  36. list.add(p);
  37. }
  38. table.put(list);
  39. table.flushCommits();
  40. }
  41. }
  42. public static void main(String[] args) throws Exception {
  43. Configuration conf = new Configuration();
  44. String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
  45. if (otherArgs.length != 3) {
  46. System.err.println("Usage: InsertDataToHBase <inpath> <outpath> <tablename>");
  47. System.exit(2);
  48. }
  49. conf.set("tabel_name", otherArgs[2]);
  50. Job job = new Job(conf, "InsertDataToHBase");
  51. job.setNumReduceTasks(0);
  52. job.setJarByClass(InsertDataToHBase.class);
  53. job.setMapperClass(InsertDataToHBaseMapper.class);
  54. FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
  55. FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
  56. // job.submit();
  57. System.exit(job.waitForCompletion(true) ? 0 : 1);
  58. }
  59. }
复制代码
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条