大家好,代码写完了。遇到:Caused by: java.io.IOException: Added a key not lexically larger than previous. Current cell = 0|000024481100|10_000024481100|10_00000185_020160907135900/f:AGE/1519957986116/Put/vlen=1/seqid=0, lastCell = 0|000024481100|10_000024481100|10_00000185_020160907135900/f:WARD_NAME/1519957986116/Put/vlen=8/seqid=0 这个老生常谈的问题了。
说是rowkey?kv没排序好,但是不知道怎么排序啊,我都排了好多次了。。
下面是我的代码,有经验的请指教 不甚感激
object BulkToHBase {
def main(args: Array[String]): Unit = {
val xs: Class[ImmutableBytesWritable] = classOf[ImmutableBytesWritable]
val classess: Array[Class[_]] = Array(classOf[ImmutableBytesWritable], classOf[KeyValue],
classOf[Put], classOf[ImmutableBytesWritable.Comparator])
val sparkConf = new SparkConf().setAppName("BulkToHBase").setMaster("local[2]")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.set("spark.kryo.referenceTracking", "false")
.registerKryoClasses(classess)
val spark = SparkSession.builder.config(sparkConf).getOrCreate
spark.sparkContext.setLogLevel("warn")
val connectionProperties = new Properties()
connectionProperties.setProperty("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver")
val predicates = Array[String]("RIGHT(EXAM_RESULT_ID,1) = '0' ",
"RIGHT(EXAM_RESULT_ID,1) = '1' ",
"RIGHT(EXAM_RESULT_ID,1) = '2' ",
"RIGHT(EXAM_RESULT_ID,1) = '3' ",
"RIGHT(EXAM_RESULT_ID,1) = '4' ",
"RIGHT(EXAM_RESULT_ID,1) = '5' ",
"RIGHT(EXAM_RESULT_ID,1) = '6' ",
"RIGHT(EXAM_RESULT_ID,1) = '7' ",
"RIGHT(EXAM_RESULT_ID,1) = '8' ",
"RIGHT(EXAM_RESULT_ID,1) = '9' ")
val url = "jdbc:sqlserver://192.168.1.21;username=sa;password=yishidb;database=CDRDB16;useUnicode=true&characterEncoding=UTF-8"
val tableName = "DC_EXAM_RESULT"
val dataSet = spark.read.jdbc(url, tableName, predicates, connectionProperties)
val cols: Array[String] = dataSet.columns.sorted
val conf = HBaseConfiguration.create()
conf.set("hbase.zookeeper.quorum", "master,slave01,slave02")
conf.set("hbase.zookeeper.property.clientPort", "2181")
val tablename = "DC_EXAM_RESULT1"
val table = new HTable(conf, tablename)
conf.set(TableOutputFormat.OUTPUT_TABLE, tablename)
lazy val job = Job.getInstance(conf)
job.setMapOutputKeyClass(classOf[ImmutableBytesWritable])
job.setMapOutputValueClass(classOf[KeyValue])
HFileOutputFormat2.configureIncrementalLoad(job, table)
val filterdDataset = dataSet.filter(x => x.getAs("PATIENT_ID") != null && x.getAs("EXAM_RESULT_ID") != null && x.getAs("ENCOUNTER_ID") != null)
val value = filterdDataset.rdd
.flatMap(x => {
val partition = StringUtils.leftPad(Integer.toString(Math.abs(x.getAs("PATIENT_ID").hashCode % 10)), 1, '0')
val rowKey = partition + "|" + x.getAs("PATIENT_ID").toString + "|" +
x.getAs("ENCOUNTER_ID").toString() + "|" + x.getAs("EXAM_RESULT_ID").toString()
val arrayBuffer = new ArrayBuffer[(ImmutableBytesWritable, KeyValue)]
for (i <- 0 until cols.length) yield {
if (x.getAs(i) != null) {
val kv: KeyValue = new KeyValue(Bytes.toBytes(rowKey), Bytes.toBytes("f"),
Bytes.toBytes(cols(i)), Bytes.toBytes(x.getAs(i).toString))
arrayBuffer.+=((new ImmutableBytesWritable(Bytes.toBytes(rowKey)), kv))
(new ImmutableBytesWritable(Bytes.toBytes(rowKey)), kv)
}
}
arrayBuffer
}
)
value.persist(StorageLevel.MEMORY_AND_DISK)
val sortedBykey = value.sortByKey()
sortedBykey.saveAsNewAPIHadoopFile("/tmp/data2", classOf[ImmutableBytesWritable], classOf[KeyValue], classOf[HFileOutputFormat2], job.getConfiguration());
val bulkLoader = new LoadIncrementalHFiles(conf)
bulkLoader.doBulkLoad(new Path("/tmp/data2"), table)
spark.stop()
}
}
|
|