分享

求助:数据导入hbase

请问各位大神:

        有没有用spark将linux本地的csv格式文件以该文件的第一列作为rowkey导入到hbase的方法或者工具。

已有(7)人评论

跳转到指定楼层
qcbb001 发表于 2017-7-25 08:52:57
无论是spark,还是storm。想操作hbase,都必须使用hbase api.
不过也有一个办法,就是hbase整合hive。插入到hive,在hbase也可以查询到。

回复

使用道具 举报

remarkzhao 发表于 2017-7-25 08:54:49
qcbb001 发表于 2017-7-25 08:52
无论是spark,还是storm。想操作hbase,都必须使用hbase api.
不过也有一个办法,就是hbase整合hive。插入 ...

懂得,hbase api是必须要用到,无论用什么样的方式。我想避开mapreduce
回复

使用道具 举报

qcbb001 发表于 2017-7-25 08:58:16
推荐参考csv导入HBase




回复

使用道具 举报

qcbb001 发表于 2017-7-25 08:59:16
remarkzhao 发表于 2017-7-25 08:54
懂得,hbase api是必须要用到,无论用什么样的方式。我想避开mapreduce

可以避开的,hbase很多导入方式。
使用bulkload 即可
hbase数据快速导入方案--bulkload
http://www.aboutyun.com/forum.php?mod=viewthread&tid=8418


回复

使用道具 举报

remarkzhao 发表于 2017-7-25 09:01:09
qcbb001 发表于 2017-7-25 08:59
可以避开的,hbase很多导入方式。
使用bulkload 即可
hbase数据快速导入方案--bulkload

有没有可能用RDD的方式导入?我想尽量用spark的方式 因为这样快点。
回复

使用道具 举报

starrycheng 发表于 2017-7-25 14:38:11
本帖最后由 starrycheng 于 2017-7-25 14:40 编辑
remarkzhao 发表于 2017-7-25 09:01
有没有可能用RDD的方式导入?我想尽量用spark的方式 因为这样快点。

可以的有两种办法:
1.通过循环rdd,也就是将rdd数据取出来,然后插入。本质其实还是通过hbase api
2.利用Spark Rdd生成Hfile直接导入到Hbase

#########################################
详细第一种办法:下面仅供参考
[mw_shl_code=scala,true]def main(args: Array[String]): Unit = {
  val sparkConf = new SparkConf().setAppName("HBaseTest").setMaster("local")
  val sc = new SparkContext(sparkConf)

  val conf = HBaseConfiguration.create()
  //设置zooKeeper集群地址,也可以通过将hbase-site.xml导入classpath,但是建议在程序里这样设置
  conf.set("hbase.zookeeper.quorum", "localhost")
  //设置zookeeper连接端口,默认2181
  conf.set("hbase.zookeeper.property.clientPort", "2181")

  val tablename = "account"

  //初始化jobconf,TableOutputFormat必须是org.apache.hadoop.hbase.mapred包下的!
  //使用Hadoop支持的文件系统格式
  val jobConf = new JobConf(conf)
  //设置输出的格式
  jobConf.setOutputFormat(classOf[TableOutputFormat])
  //设置输出的表名
  jobConf.set(TableOutputFormat.OUTPUT_TABLE, tablename)

  val indataRDD = sc.makeRDD(Array("abc123,May,15", "abc124,luj,16", "bcd121,jual,16"))

  val rdd: RDD[(ImmutableBytesWritable, Put)] = indataRDD.map(_.split(',')).map { arr => {
    /*一个Put对象就是一行记录,在构造方法中指定主键
     * 所有插入的数据必须用org.apache.hadoop.hbase.util.Bytes.toBytes方法转换
     * Put.add方法接收三个参数:列族,列名,数据
     */
    val put = new Put(Bytes.toBytes(arr(0).toString))
    put.addColumn(Bytes.toBytes("cf"), "name".getBytes, Bytes.toBytes(arr(1)))
    put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("age"), Bytes.toBytes(arr(2)))
    //转化成RDD[(ImmutableBytesWritable,Put)]类型才能调用saveAsHadoopDataset
    (new ImmutableBytesWritable, put)
  }
  }

  val finalRdd = indataRDD.map(x => {
    val pp = x.split(",")
    convertRDD((pp(0), pp(1), pp(2)))
  })

  rdd.saveAsHadoopDataset(jobConf)


  sc.stop()
}

def convertRDD(triple: (String, String, String)) = {

  val p = new Put(triple._1.getBytes)
  p.addColumn("cf".getBytes, "name".getBytes, triple._2.getBytes)
  p.addColumn("cf".getBytes, "name".getBytes, triple._3.getBytes)
  (new ImmutableBytesWritable, p)

}
def createTable(tableName: String, familCol: Seq[String]) = {

  val conn = getConnection()
  val admin = conn.getAdmin
  val tbName = TableName.valueOf(tableName)
  val hdp = new HTableDescriptor(tbName)
  if (!admin.tableExists(tbName)) {
    familCol.foreach(x => {
      hdp.addFamily(new HColumnDescriptor(x.getBytes))
    })
  }
  admin.createTable(hdp)
  conn.close()
}

def getConnection(): Connection = {

  val conf = HBaseConfiguration.create()
  conf.set("hbase.zookeeper.property.clientPort", "2181")
  conf.set("hbase.zookeeper.quorum", "localhost")
  conf.set("hbase.master", "127.0.0.1:60000")
  //Connection 的创建是个重量级的工作,线程安全,是操作hbase的入口
  val conn = ConnectionFactory.createConnection(conf)
  conn
}
def dropTable(tableName: String): Unit = {
  val conn = getConnection()
  val admin = conn.getAdmin
  val tbName = TableName.valueOf(tableName)
  if (admin.tableExists(tbName)) {
    admin.disableTable(tbName)
    admin.deleteTable(tbName)

  }
  conn.close()

}

def addRow2Table(tableName: String, rowkey: String, familCol: String, qualifer: String, colvalue: String) = {
  val tbName = TableName.valueOf(tableName)
  val conn = getConnection()
  val table = conn.getTable(tbName)
  val put = new Put(rowkey.getBytes)
  put.addColumn(familCol.getBytes, qualifer.getBytes, colvalue.getBytes)
  table.put(put)
}
def main(args: Array[String]): Unit = {

    val conf = new SparkConf().setAppName("hbase基本使用").setMaster("local")
    implicit val sc = new SparkContext(conf)
    val sqlcontext = new SQLContext(sc)
    import sqlcontext.implicits._
//    dropTable("account")
//    createTable("account", Seq("cf"))
    val tableRDD = Hbase2RDD("account").cache()

    println("allData:" + tableRDD.count())
    tableRDD.foreach { case (_, resutl) => {
      //      val rowkye = Bytes.toInt(resutl.getRow)
      //      val age = Bytes.toInt(resutl.getValue("cf".getBytes, "age".getBytes))
      //      val name = Bytes.toString(resutl.getValue("cf".getBytes, "name".getBytes))
      //      println("rowkye:" + rowkye + "----" + "age:" + age + "---" + "name:" + name)

      val cell = resutl.rawCells()
      println("rowKey:" + new String(resutl.getRow))
      cell.foreach(x => {
        val colFamily = new String(CellUtil.cloneFamily(x))
        val colQualifier = Bytes.toString(CellUtil.cloneQualifier(x))
        val colValue = Bytes.toString(CellUtil.cloneValue(x))
        println("colFamily: " + colFamily + "  colQualifier:" + colQualifier + "  colValue:" + colValue)
      })
    }
    }[/mw_shl_code]

来自:UnionIBM

####################
第二种办法:

针对大批量插入Hbase的场景,如果单条记录插入的时候效率比较低下,如果可以利用Rdd生成Hfile的话,然后利用Bulk Load导入Hfile的话,则会大大提升导入的速度,废话不说,直接上代码:
1.利用Create创建表blog:create 'blog' ,'article'
1.jpg
2.创建数据文件 blog.txt
2.jpg
3.上传文件至hdfs
3.jpg
备注:因为之前文件已经上传了
4.Java版本代码


[mw_shl_code=java,true]
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapred.TableOutputFormat;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapreduce.Job;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.*;
import org.apache.spark.sql.hive.HiveContext;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import scala.Tuple2;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;

/**
* Created by WangLiang on 2015/11/30.
*/
public class Test {
    private static Logger log = Logger.getLogger(HelloWorld.class);
    public static void main(String[] args) {
        try {
            System.setProperty("javax.xml.parsers.DocumentBuilderFactory",
                    "com.sun.org.apache.xerces.internal.jaxp.DocumentBuilderFactoryImpl");
            System.setProperty("javax.xml.parsers.SAXParserFactory",
                    "com.sun.org.apache.xerces.internal.jaxp.SAXParserFactoryImpl");
            //项目内部自己的配置类,可以忽略,其实就是设置sparkConf,然后获取到JavaSparkContext
            String sparkMaster = Configure.instance.get("sparkMaster");
            String sparkJarAddress = Configure.instance.get("sparkJarAddress");
            String sparkExecutorMemory = Configure.instance.get("sparkExecutorMemory");
            String sparkCoresMax = Configure.instance.get("sparkCoresMax");
            String sparkLocalDir = Configure.instance.get("sparkLocalDir");
            log.info("initialize parameters");
            log.info("sparkMaster:" + sparkMaster);
            log.info("sparkJarAddress:" + sparkJarAddress);
            log.info("sparkExecutorMemory:" + sparkExecutorMemory);
            log.info("sparkCoresMax:" + sparkCoresMax);
            log.info("sparkLocalDir:" + sparkLocalDir);
            SparkConf sparkConf = new SparkConf().setAppName("dse load application in Java");
            sparkConf.setMaster(sparkMaster);
            if (!sparkJarAddress.isEmpty() && !sparkMaster.contains("local")) {
                sparkConf.set("spark.executor.memory", sparkExecutorMemory); // 16g
                sparkConf.set("spark.scheduler.mode", "FAIR");
                sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
                sparkConf.set("spark.kryo.registrator", "com.dahua.dse3.driver.dataset.DseKryoRegistrator");
                sparkConf.set("spark.cores.max", sparkCoresMax);
                sparkConf.set("spark.akka.threads", "12");
                sparkConf.set("spark.local.dir", sparkLocalDir);
                sparkConf.set("spark.shuffle.manager", "SORT");
                sparkConf.set("spark.network.timeout", "120");
                sparkConf.set("spark.rpc.lookupTimeout", "120");
                sparkConf.set("spark.executor.extraClassPath", "/usr/dahua/spark/executelib/hbase-protocol-0.98.3-hadoop2.jar");
                sparkConf.set("spark.executor.extraJavaOptions", "-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps");
                sparkConf.set("spark.driver.extraJavaOptions", "-XX:PermSize=256M -XX:MaxPermSize=512M");
                sparkConf.set("spark.network.timeout", "120");

            }
            JavaSparkContext jsc = new JavaSparkContext(sparkConf);
            if (!sparkJarAddress.isEmpty() && !sparkMaster.contains("local")) {
                jsc.addJar(sparkJarAddress);
            }

            Configuration conf = HBaseConfiguration.create();
            String zk = "172.25.3.160,172.25.3.161,172.25.3.162";
            String tableName = "blog";
            conf.set("hbase.zookeeper.quorum", zk);
            HTable table = new HTable(conf, tableName);
            conf.set(TableOutputFormat.OUTPUT_TABLE, tableName);
            Job job = Job.getInstance(conf);
            job.setMapOutputKeyClass(ImmutableBytesWritable.class);
            job.setMapOutputValueClass(KeyValue.class);
            HFileOutputFormat.configureIncrementalLoad(job, table);
            String hdfsPath = "hdfs://mycluster/raw/hfile/blog.txt";
            JavaRDD<String> lines = jsc.textFile(hdfsPath);
            JavaPairRDD<ImmutableBytesWritable,KeyValue> hfileRdd = lines.mapToPair(new PairFunction<String, ImmutableBytesWritable, KeyValue>() {
                public Tuple2<ImmutableBytesWritable, KeyValue> call(String v1) throws Exception {
                    String[] tokens = v1.split(" ");
                    String rowkey = tokens[0];
                    String content = tokens[1];
                    KeyValue keyValue = new KeyValue(Bytes.toBytes(rowkey), Bytes.toBytes("article"), Bytes.toBytes("value"), Bytes.toBytes(content));
                    return new Tuple2<ImmutableBytesWritable, KeyValue>(new ImmutableBytesWritable(Bytes.toBytes(rowkey)), keyValue);
                }
            });
            String hfilePath = "hdfs://mycluster/hfile/blog.hfile";
            hfileRdd.saveAsNewAPIHadoopFile(hfilePath, ImmutableBytesWritable.class, KeyValue.class, HFileOutputFormat.class, conf);
                        
                        //利用bulk load hfile
            LoadIncrementalHFiles bulkLoader = new LoadIncrementalHFiles(conf);
            bulkLoader.doBulkLoad(new Path(hfilePath), table);

        }catch(Exception e){
            e.printStackTrace();
        }finally {
            ;
        }
    }

}

[/mw_shl_code]

5.scan blog表,数据已经入库
4.jpg


来自:csdn 亮亮-AC米兰


回复

使用道具 举报

remarkzhao 发表于 2017-7-25 14:44:35

多谢大神了以及楼下的各位神。。
我的问题变了,我现在要从sqlserver里拉表到hbase
要求:1.用spark  2.无需提前在hbase建表 3.指定sqlserver中拉出来的表的第三列为hbase rowkey   

PS:刚才看了一下phoenix,似乎可以满足我的要求,但是row key是自动的sqlserver里拉出来的表的第一列
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条