在项目中有需求需要将Hive表中的数据存储在HBase中。使用Spark访问Hive表,将读表数据导入到HBase中,写入HBase有两种方式:一种是通过HBase的API接口批量的将数据写入HBase,另一种是通过BulkLoad的方式生成HFile文件然后加载到HBase中,两种方式相比之下第二种效率会更高。本篇文章Fayson主要介绍如何使用Spark读取Hive表数据通过BulkLoad的方式快速的将数据导入到HBase。
1.将准备好的hbase-spark-1.2.0-cdh5.13.1.jar部署到集群所有节点的/opt/cloudera/parcels/CDH/lib/spark/lib目录下
[mw_shl_code=shell,true][root@cdh01 ~]# ll /opt/cloudera/parcels/CDH/lib/spark/lib/
[/mw_shl_code]
[mw_shl_code=shell,true]export SPARK_DIST_CLASSPATH=$SPARK_DIST_CLASSPATH:/opt/cloudera/parcels/CDH/lib/spark/lib/hbase-spark-1.2.0-cdh5.13.1.jar
[/mw_shl_code]
[mw_shl_code=java,true]package com.cloudera.hbase
import java.io.IOException
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.hbase._
import org.apache.hadoop.hbase.client._
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.hadoop.hbase.mapreduce.{HFileOutputFormat2, LoadIncrementalHFiles}
import org.apache.hadoop.hbase.spark.{HBaseContext, KeyFamilyQualifier}
import org.apache.hadoop.hbase.spark.HBaseRDDFunctions._
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapreduce.Job
import scala.collection.mutable
/**
* package: com.cloudera.hbase
* describe: 使用BulkLoad的方式将Hive数据导入HBase
* creat_user: Fayson
* email:
htechinfo@163.com
* creat_date: 2018/7/31
* creat_time: 下午2:04
* 公众号:Hadoop实操
*/
object Hive2HBase {
def main(args: Array[String]) {
//库名、表名、rowKey对应的字段名、批次时间、需要删除表的时间参数
val rowKeyField = "id"
val quorum = "cdh01.fayson.com,cdh02.fayson.com,cdh03.fayson.com"
val clientPort = "2181"
val hBaseTempTable = "ods_user_hbase"
val sparkConf = new SparkConf().setAppName("Hive2HBase")
val sc = new SparkContext(sparkConf)
val hiveContext = new HiveContext(sc)
//从hive表读取数据
val datahiveDF = hiveContext.sql(s"select * from ods_user")
//表结构字段
var fields = datahiveDF.columns
//去掉rowKey字段
fields = fields.dropWhile(_ == rowKeyField)
val hBaseConf = HBaseConfiguration.create()
hBaseConf.set("hbase.zookeeper.quorum", quorum)
hBaseConf.set("hbase.zookeeper.property.clientPort", clientPort)
//表不存在则建Hbase临时表
creteHTable(hBaseTempTable, hBaseConf)
val hbaseContext = new HBaseContext(sc, hBaseConf)
//将DataFrame转换bulkload需要的RDD格式
val rddnew = datahiveDF.rdd.map(row => {
val rowKey = row.getAs[String](rowKeyField)
fields.map(field => {
val fieldValue = row.getAs[String](field)
(Bytes.toBytes(rowKey), Array((Bytes.toBytes("info"), Bytes.toBytes(field), Bytes.toBytes(fieldValue))))
})
}).flatMap(array => {
(array)
})
//使用HBaseContext的bulkload生成HFile文件
hbaseContext.bulkLoad[Put](rddnew.map(record => {
val put = new Put(record._1)
record._2.foreach((putValue) => put.addColumn(putValue._1, putValue._2, putValue._3))
put
}), TableName.valueOf(hBaseTempTable), (t : Put) => putForLoad(t), "/tmp/bulkload")
val conn = ConnectionFactory.createConnection(hBaseConf)
val hbTableName = TableName.valueOf(hBaseTempTable.getBytes())
val regionLocator = new HRegionLocator(hbTableName, classOf[ClusterConnection].cast(conn))
val realTable = conn.getTable(hbTableName)
HFileOutputFormat2.configureIncrementalLoad(Job.getInstance(), realTable, regionLocator)
// bulk load start
val loader = new LoadIncrementalHFiles(hBaseConf)
val admin = conn.getAdmin()
loader.doBulkLoad(new Path("/tmp/bulkload"),admin,realTable,regionLocator)
sc.stop()
}
/**
* 创建HBase表
* @param tableName 表名
*/
def creteHTable(tableName: String, hBaseConf : Configuration) = {
val connection = ConnectionFactory.createConnection(hBaseConf)
val hBaseTableName = TableName.valueOf(tableName)
val admin = connection.getAdmin
if (!admin.tableExists(hBaseTableName)) {
val tableDesc = new HTableDescriptor(hBaseTableName)
tableDesc.addFamily(new HColumnDescriptor("info".getBytes))
admin.createTable(tableDesc)
}
connection.close()
}
/**
* Prepare the Put object for bulkload function.
* @param put The put object.
* @throws java.io.IOException
* @throws java.lang.InterruptedException
* @return Tuple of (KeyFamilyQualifier, bytes of cell value)*/
@throws(classOf[IOException])
@throws(classOf[InterruptedException])
def putForLoad(put: Put): Iterator[(KeyFamilyQualifier, Array[Byte])] = {
val ret: mutable.MutableList[(KeyFamilyQualifier, Array[Byte])] = mutable.MutableList()
import scala.collection.JavaConversions._
for (cells <- put.getFamilyCellMap.entrySet().iterator()) {
val family = cells.getKey
for (value <- cells.getValue) {
val kfq = new KeyFamilyQualifier(CellUtil.cloneRow(value), family, CellUtil.cloneQualifier(value))
ret.+=((kfq, CellUtil.cloneValue(value)))
}
}
ret.iterator
}
}[/mw_shl_code]
2.使用bulkload的方式导入数据到HBase表时,在load HFile文件到表过程中会有短暂的时间导致该表停止服务(在load文件过程中需要先disable表,load完成后在enable表。