你可以参考下面的代码
- import java.io.IOException;
- import java.io.ByteArrayOutputStream;
- import java.io.DataOutputStream;
- import java.io.ByteArrayInputStream;
- import java.io.DataInputStream;
- import java.util.Map;
- import org.apache.hadoop.io.Writable;
- import org.apache.hadoop.io.IntWritable;
- import org.apache.hadoop.hbase.HBaseConfiguration;
- import org.apache.hadoop.hbase.HTableDescriptor;
- import org.apache.hadoop.hbase.HColumnDescriptor;
- import org.apache.hadoop.hbase.client.HBaseAdmin;
- import org.apache.hadoop.hbase.client.HTable;
- import org.apache.hadoop.hbase.io.BatchUpdate;
- import org.apache.hadoop.hbase.io.RowResult;
- import org.apache.hadoop.hbase.io.Cell;
- import org.apache.hadoop.hbase.util.Writables;
-
- public class HBaseBasic {
- public static void main(String[] args) throws Exception {
- HBaseConfiguration config = new HBaseConfiguration();
- HBaseAdmin admin = new HBaseAdmin(config);
- if (admin.tableExists("scores")) {
- System.out.println("drop table");
-
- admin.disableTable("scores");
- admin.deleteTable("scores");
- }
- System.out.println("create table");
- HTableDescriptor tableDescripter = new HTableDescriptor("scores".getBytes());
- tableDescripter.addFamily(new HColumnDescriptor("grade:"));
- tableDescripter.addFamily(new HColumnDescriptor("course:"));
- admin.createTable(tableDescripter);
- HTable table = new HTable(config, "scores");
- System.out.println("add Tom's data");
- BatchUpdate tomUpdate = new BatchUpdate("Tom");
- tomUpdate.put("grade:", Writables.getBytes(new IntWritable(1)));
- tomUpdate.put("course:math", Writables.getBytes(new IntWritable(87)));
- tomUpdate.put("course:art", Writables.getBytes(new IntWritable(97)));
- table.commit(tomUpdate);
- System.out.println("add Jerry's data");
- BatchUpdate jerryUpdate = new BatchUpdate("Jerry");
- jerryUpdate.put("grade:", Writables.getBytes(new IntWritable(2)));
- jerryUpdate.put("course:math", Writables.getBytes(new IntWritable(100)));
- jerryUpdate.put("course:art", Writables.getBytes(new IntWritable(80)));
- table.commit(jerryUpdate);
- for (RowResult row : table.getScanner(new String[] { "course:" })) {
- System.out.format("ROW\t%s\n", new String(row.getRow()));
- for (Map.Entry<byte[], Cell> entry : row.entrySet()) {
- String column = new String(entry.getKey());
- Cell cell = entry.getValue();
- IntWritable value = new IntWritable();
- Writables.copyWritable(cell.getValue(), value);
- System.out.format(" COLUMN\t%s\t%d\n", column, value.get());
-
- }
- }
- }
- }
- <DIV class=blockcode>
- <BLOCKQUOTE>/**
- * 向指定的表插入单个Put对象
- *
- * @param tablename
- * @param conf
- * @throws Exception
- */
- public static void insertData(String tableName, HBaseConfiguration conf) {
- HTable table = null;
- try {
- if (table == null) {
- table = new HTable(conf, tableName);
- }
- // 这里我使用time+6位随机数为row关键字,确保不重复
- String rowname = System.currentTimeMillis() / 1000 + "" + CommUtil.getSixRadom();
- System.out.println("rowname = " + rowname);
- Put p = new Put(Bytes.toBytes(rowname));
- p.add("acc".getBytes(), new Long(System.currentTimeMillis()).longValue(), "大绝招".getBytes());
- p.add("pwd".getBytes(), new Long(System.currentTimeMillis()).longValue(), "123456".getBytes());
- p.add("sex".getBytes(), new Long(System.currentTimeMillis()).longValue(), "1".getBytes());
- p.add("age".getBytes(), new Long(System.currentTimeMillis()).longValue(), "2222".getBytes());
- table.put(p);
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- CommUtil.HBaseClose(table);
- }
- }
-
复制代码
- public class InsertDataToHBase {
- public static class InsertDataToHBaseMapper extends Mapper<Object, Text, NullContext, NullWritable> {
- public static String table1[] = { "field1", "field2", "field3"};
- public static String table2[] = { "field1", "field2", "field3"};
- public static String table3[] = { "field1", "field2", "field3"};
- public static HTable table = null;
- protected void setup(Context context) throws IOException, InterruptedException {
- HBaseConfiguration conf = new HBaseConfiguration();
- String table_name = context.getConfiguration().get("tabel_name");
- if (table == null) {
- table = new HTable(conf, table_name);
- }
- }
-
- public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
- String arr_value[] = value.toString().split("\t");
- String table_name = context.getConfiguration().get("tabel_name");
- String temp_arr[] = table1;
- int temp_value_length = 0;
- if (table_name.trim().equals("table1")) {
- temp_arr = table1;
- temp_value_length = 3;
- } else if (table_name.trim().equals("table2")) {
- temp_arr = table2;
- temp_value_length = 3;
- } else if (table_name.trim().equals("table3")) {
- temp_arr = table3;
- temp_value_length = 3;
- }
- List<Put> list = new ArrayList<Put>();
- if (arr_value.length == temp_value_length) {
- String rowname = System.currentTimeMillis() / 1000 + "" + CommUtil.getSixRadom();
- Put p = new Put(Bytes.toBytes(rowname));
- for (int i = 0; i < temp_arr.length; i++) {
- p.add(temp_arr[i].getBytes(), "".getBytes(), arr_value[i].getBytes());
- }
- list.add(p);
- }
- table.put(list);
- table.flushCommits();
- }
- }
-
- public static void main(String[] args) throws Exception {
- Configuration conf = new Configuration();
- String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
- if (otherArgs.length != 3) {
- System.err.println("Usage: InsertDataToHBase <inpath> <outpath> <tablename>");
- System.exit(2);
- }
- conf.set("tabel_name", otherArgs[2]);
- Job job = new Job(conf, "InsertDataToHBase");
- job.setNumReduceTasks(0);
- job.setJarByClass(InsertDataToHBase.class);
- job.setMapperClass(InsertDataToHBaseMapper.class);
- FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
- FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
- // job.submit();
- System.exit(job.waitForCompletion(true) ? 0 : 1);
- }
- }
复制代码
|