额这个问题个人感觉好奇怪,可能又是我哪里疏漏了什么
如上图,这个时候rdd.saveAsNewAPIHadoopFile时没问题的,但在执行时有报错,
发现是调用convertToHbasePut时indexoutofrange,被传入了一个空数据,所以想在map时加一个是否为空的判断,
如下
结果加了if判断后,下面那条saveAsNewAPIHadoopFile的语句就报错了,执行也无法通过,报错:
Error:(185, 56) value saveAsNewAPIHadoopFile is not a member of org.apache.spark.rdd.RDD[Any]
result.foreachRDD(rdd => {if(! rdd.isEmpty()) {rdd.saveAsNewAPIHadoopFile("", classOf[ImmutableBytesWritable], classOf[Put], classOf[MultiTableOutputFormat], jobConf)}})
convertToHbasePut的部分如下:
def convertToHbasePut(line:String,splitstr1:String): (ImmutableBytesWritable, Put) = {
val line_list = line.split(splitstr1)
val tablename = line_list(0)
val rowKey = line_list(1)
val cols = line_list(2)
val col = line_list(3)
val value = line_list(4)
val put = new Put(Bytes.toBytes(rowKey))
put.addColumn(Bytes.toBytes(cols), Bytes.toBytes(col), Bytes.toBytes(value))
(new ImmutableBytesWritable(Bytes.toBytes(tablename)), put)
}
|
|