def loadData1(dbDF:DataFrame,tablename:String,family:String,col:String)={
val readFile = dbDF.rdd.map{ x =>Array(x(3),x(45))}
readFile.foreachPartition{
x=> {
val myConf = HBaseConfiguration.create()
myConf.set("hbase.zookeeper.quorum", "hadoop001,hadoop002,hadoop003")
myConf.set("hbase.zookeeper.property.clientPort", "2181")
myConf.set("hbase.defaults.for.version.skip", "true")
val myTable = new HTable(myConf, TableName.valueOf(tablename))
myTable.setAutoFlush(false, false)//关键点1
myTable.setWriteBufferSize(3*1024*1024)//关键点2
x.foreach { y => {
if(y(0).toString == "123") {
println(y(0) + ":::" + y(1))
}
val p = new Put(Bytes.toBytes(y(0).toString))
p.add(family.getBytes, col.getBytes, Bytes.toBytes(y(1).toString))
myTable.put(p)
}
}
myTable.flushCommits()//关键点3
def loadData(dbDF:DataFrame,tablename:String,jobConf: JobConf,family:String,col:String)={
jobConf.set(TableOutputFormat.OUTPUT_TABLE, tablename)
jobConf.setOutputFormat(classOf[TableOutputFormat])
val readFile = dbDF.rdd.map{ x =>Array(x(3),x(45))}
readFile.map(x => {
var put = new Put(x(0).toString.getBytes())
put.addColumn(Bytes.toBytes(family), Bytes.toBytes(col), Bytes.toBytes(x(1).toString))
(new ImmutableBytesWritable, put)
}).saveAsHadoopDataset(jobConf)
}
这两种都试过了 还是一样结果。。真愁人。。
|