思路有点乱,应该受到了原贴的影响。其实楼主的思路跟下面,是完全不一样的思路。就不需要使用KeyValue
[mw_shl_code=scala,true]val conf = HBaseConfiguration.create();val tableName = "data1"
val table = new HTable(conf,tableName)
conf.set(TableOutputFormat.OUTPUT_TABLE,tableName)
lazy val job = Job.getInstance(conf)
job.setMapOutputKeyClass(classOf[ImmutableBytesWritable])
job.setMapOutputValueClass(classOf[KeyValue])
HFileOutputFormat.configureIncrementalLoad(job,table)
val rdd = sc.textFile("/data/produce/2015/2015-03-01.log").map(_.split("@")).map{x => (DigestUtils.md5Hex(x(0)+x(1)).substring(0,3)+x(0)+x(1),x(2))}.sortBy(x =>x._1).map{x=>{val kv:KeyValue = new KeyValue(Bytes.toBytes(x._1),Bytes.toBytes("v"),Bytes.toBytes("value"),Bytes.toBytes(x._2+""));(new ImmutableBytesWritable(kv.getKey),kv)}}
rdd.saveAsNewAPIHadoopFile("/tmp/data1",classOf[ImmutableBytesWritable],classOf[KeyValue],classOf[HFileOutputFormat],job.getConfiguration())
val bulkLoader = new LoadIncrementalHFiles(conf)
bulkLoader.doBulkLoad(new Path("/tmp/data1"),table)[/mw_shl_code]
println(rowKey + " f." + schema(i) + ":" + x.getAs(i));,在这里你已经获取了他们的值,为何还需要使用keyvalue。你获取值之后,直接使用put插入就可以了。所以放弃keyvalue,因为你根本不需要。只有在使用saveAsNewAPIHadoopFile的时候,才需要使用。
#####
如果你使用了saveAsNewAPIHadoopFile。那么肯定是使用的下面初始化函数
KeyValue(byte[] row, byte[] family, byte[] qualifier, byte[] value),也就是如下:
KeyValue(Bytes.toBytes(x._1),Bytes.toBytes("v"),Bytes.toBytes("value"),Bytes.toBytes(x._2+""));
Bytes.toBytes(x._1)是row,Bytes.toBytes("v")是列簇,Bytes.toBytes("value")这里应该是列名, Bytes.toBytes(x._2+"")为value值。
new KeyValue(Bytes.toBytes(x._1),
Bytes.toBytes("v"), Bytes.toBytes("value"),
Bytes.toBytes(x._2 + ""));
对于你上面代码,自己核实下。
Bytes.toBytes(x._1)是否为row, Bytes.toBytes("v")是否为列簇,Bytes.toBytes("value")是否为列名, Bytes.toBytes(x._2 + ""))是否为列值。如果都是那就初始化成功了,你直接使用即可。如果你有多个列,那么这里你Bytes.toBytes("value"),替换为 Bytes.toBytes(schema(i)),感觉没有错误,说不识别是出现了什么问题??具体错误看下。 |