分享

求助 rdd.saveAsNewAPIHadoopFile 的一个奇怪问题

grinsky 发表于 2017-12-22 16:45:44 [显示全部楼层] 只看大图 回帖奖励 阅读模式 关闭右栏 5 15051
额这个问题个人感觉好奇怪,可能又是我哪里疏漏了什么
1.png
如上图,这个时候rdd.saveAsNewAPIHadoopFile时没问题的,但在执行时有报错,
发现是调用convertToHbasePut时indexoutofrange,被传入了一个空数据,所以想在map时加一个是否为空的判断,
如下
2.png

结果加了if判断后,下面那条saveAsNewAPIHadoopFile的语句就报错了,执行也无法通过,报错:
Error:(185, 56) value saveAsNewAPIHadoopFile is not a member of org.apache.spark.rdd.RDD[Any]
    result.foreachRDD(rdd => {if(! rdd.isEmpty()) {rdd.saveAsNewAPIHadoopFile("", classOf[ImmutableBytesWritable], classOf[Put], classOf[MultiTableOutputFormat], jobConf)}})



convertToHbasePut的部分如下:
def convertToHbasePut(line:String,splitstr1:String): (ImmutableBytesWritable, Put) = {
  val line_list = line.split(splitstr1)
  val tablename = line_list(0)
  val rowKey = line_list(1)
  val cols = line_list(2)
  val col = line_list(3)
  val value = line_list(4)
  val put = new Put(Bytes.toBytes(rowKey))
  put.addColumn(Bytes.toBytes(cols), Bytes.toBytes(col), Bytes.toBytes(value))
  (new ImmutableBytesWritable(Bytes.toBytes(tablename)), put)
}

已有(5)人评论

跳转到指定楼层
langke93 发表于 2017-12-22 17:07:12
看到你的代码都是灰色的,可能是参数类型等类似问题造成的。目前想到的解决办法,如果在里面有问题,那就把条件判断提前,别放到里面。
回复

使用道具 举报

m331283153 发表于 2017-12-22 17:43:25
你上面没加判断的时候result的类型是RDD[(ImmutableBytesWritable, Put)]这个类型,加了判断之后result里面既有(ImmutableBytesWritable, Put)还有空值类型  ,所以result就变成RDD[Any]了   
你应该是先把空值过滤掉在map
回复

使用道具 举报

m331283153 发表于 2017-12-22 17:44:53
先filter再map
回复

使用道具 举报

grinsky 发表于 2017-12-22 19:13:20
m331283153 发表于 2017-12-22 17:43
你上面没加判断的时候result的类型是RDD[(ImmutableBytesWritable, Put)]这个类型,加了判断之后result里面 ...

嗯嗯,的确是这个问题(我原来以为不满足if的数据就会被丢掉的……,结果这里是原样保存不处理的)不过处理好后,又有新的问题了。
接收到的第一条消息都能正常存储到hbase对应的表中,但执行完第一条后就报错了……

java.lang.IllegalArgumentException: Can not create a Path from an empty string


我已经设置了过滤空的rdd,但好像没什么用
val lines3 = lines2.flatMap(line =>{line.toString.split(splitstr2)}).filter(line => line.length > 2)
val result = lines3.map(line => convertToHbasePut(line,splitstr1))
result.foreachRDD(rdd => {if(! rdd.isEmpty()){rdd.saveAsNewAPIHadoopFile("", classOf[ImmutableBytesWritable], classOf[Put], classOf[MultiTableOutputFormat], jobConf)}})
ssc.start()
ssc.awaitTermination()
回复

使用道具 举报

grinsky 发表于 2017-12-22 20:28:43
解决了,在path那里,填个路径就好了,数据还是写入对应的hbsae的表里
没有存到path里面去。
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条