分享

用Spark读写Hbase出现Task not serializable

chimes298 发表于 2016-2-29 11:12:18 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 10 34989
写了一个从Hbase中读数据处理后再保存回Hbase的程序,但是运行会出现Task not serializable报错
代码如下:
[mw_shl_code=scala,true]import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.{Put, HTable}
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.spark.{SparkContext, SparkConf}

object WriteHbase {
  def main(args: Array[String]) {

    val conf = new SparkConf().setAppName("WriteHbase")
    val sc = new SparkContext(conf)
    //val outputFile = args(0)

    val getTableName = "TestTable"
    val putTableName = "TestTableOut"
    val configuration = HBaseConfiguration.create()

    configuration.set(TableInputFormat.INPUT_TABLE, getTableName)  //设定表名

    val hBaseRDD = sc.newAPIHadoopRDD(configuration, classOf[TableInputFormat],
      classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
      classOf[org.apache.hadoop.hbase.client.Result])

    val resultRDD = hBaseRDD.map(tuple => tuple._2)

    val arrayRDD = resultRDD.map( res => res.raw() )    //将Result类型转换为Array类型

    val columnRDD = arrayRDD.map (
      res => res.map(
        arr => {
          val rowName = new String(arr.getRow);          //去掉rowName中表示小时的两位
          (rowName, List(( new String (arr.getQualifier) , new String(arr.getValue))))
        }

      )
    )

     val t = columnRDD.map(res => res.reduce{ (x,y) => (x._1,x._2++y._2)})


    t.foreachPartition(
      iter => {
        val hbaseConf = HBaseConfiguration.create();
        val hBaseTable = new HTable(hbaseConf,putTableName);
        iter.foreach(
          tuple => {
            val put = new Put(tuple._1.getBytes());
            val result = tuple._2;
            for( i <- result.indices){
              put.add("PM".getBytes(), result(i)._1.getBytes(), result(i)._2.getBytes())
            }
            hBaseTable.put(put)
          }
        )
      }
    )

    System.exit(0)
  }
}
[/mw_shl_code]

已经在spark-defaults.conf中配置了  spark.serializer  org.apache.spark.serializer.KryoSerializer
而且单独读取Hbase和单独写Hbase都没问题,没有序列化问题,但是在一个程序里同时先读后写就会报序列化问题:
[mw_shl_code=java,true]org.apache.spark.SparkException: Task not serializable
        at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
        at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
        at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
        at org.apache.spark.SparkContext.clean(SparkContext.scala:2055)
        at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:919)
        at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:918)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
        at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:918)[/mw_shl_code]

我怀疑是程序中两次 HBaseConfiguration.create()造成的问题,第一次connections池没有关闭的原因吗?
求大神指点

已有(10)人评论

跳转到指定楼层
wscl1213 发表于 2016-2-29 11:34:46
没遇到过这种情况
不过赞同楼主的猜测,如果两个都可以的话,就读取完后,关闭掉。然后再打开写。弄成两个完全独立的试试
回复

使用道具 举报

atsky123 发表于 2016-2-29 14:30:28
HBaseConfiguration.create()这个两个实例可能会冲突,而且第一个应该还没有注销掉。
一个连接应该没关系。

回复

使用道具 举报

chimes298 发表于 2016-2-29 14:31:17
atsky123 发表于 2016-2-29 14:30
HBaseConfiguration.create()这个两个实例可能会冲突,而且第一个应该还没有注销掉。
一个连接应该没关系 ...

请问怎么注销掉第一个HBaseConfiguration.create()?
回复

使用道具 举报

when30 发表于 2016-2-29 14:41:08
chimes298 发表于 2016-2-29 14:31
请问怎么注销掉第一个HBaseConfiguration.create()?

它这个没有,只能等待Java回收了。
楼主可以只创建一个全局HBaseConfiguration就可以了,这样就不会重复了
回复

使用道具 举报

chimes298 发表于 2016-2-29 14:49:01
when30 发表于 2016-2-29 14:41
它这个没有,只能等待Java回收了。
楼主可以只创建一个全局HBaseConfiguration就可以了,这样就不会重复 ...

按照您说得试了一下,创建HTable也用第一次创建的HBaseConfiguration,但是还会报Task not serialization
[mw_shl_code=scala,true]import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.{Put, HTable}
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.spark.{SparkContext, SparkConf}

/**
  * Created by CMDI-W on 2016/2/29.
  */
object WriteHbase {
  def main(args: Array[String]) {

    val conf = new SparkConf().setAppName("WriteHbase")
    val sc = new SparkContext(conf)
    //val outputFile = args(0)

    val getTableName = "TestTable"
    val putTableName = "TestTableOut"
    val configuration = HBaseConfiguration.create()

    configuration.set(TableInputFormat.INPUT_TABLE, getTableName)  //设定表名

    val hBaseRDD = sc.newAPIHadoopRDD(configuration, classOf[TableInputFormat],
      classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
      classOf[org.apache.hadoop.hbase.client.Result])

    val resultRDD = hBaseRDD.map(tuple => tuple._2)

    val arrayRDD = resultRDD.map( res => res.raw() )    //将Result类型转换为Array类型

    val columnRDD = arrayRDD.map (
      res => res.map(
        arr => {
          val rowName = new String(arr.getRow);          //取出rowName,columnName,value
          (rowName, List(( new String (arr.getQualifier) , new String(arr.getValue))))
        }

      )
    )

     val t = columnRDD.map(res => res.reduce{ (x,y) => (x._1,x._2++y._2)}).persist()




    t.foreachPartition(
      iter => {
        //val hbaseConf = HBaseConfiguration.create();
        //val hBaseTable = new HTable(hbaseConf,putTableName);
        val hBaseTable = new HTable(configuration,putTableName);
        iter.foreach(
          tuple => {
            val put = new Put(tuple._1.getBytes());
            val result = tuple._2;
            for( i <- result.indices){
              put.add("PM".getBytes(), result(i)._1.getBytes(), result(i)._2.getBytes())
            }
            hBaseTable.put(put)
          }
        )
      }
    )

    System.exit(0)
  }
}[/mw_shl_code]
回复

使用道具 举报

when30 发表于 2016-2-29 15:07:38
读取之后,该关闭的关闭,该注销的注销
回复

使用道具 举报

chimes298 发表于 2016-2-29 16:09:28
when30 发表于 2016-2-29 15:07
读取之后,该关闭的关闭,该注销的注销

谢谢~问题解决了
把读和写分在两个方法里面写,这样读的周期结束,JVM就会自动清理,关闭都时候的Connection池。
[mw_shl_code=scala,true]import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.{Put, HTable}
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkContext, SparkConf}

/**
  * Created by CMDI-W on 2016/2/29.
  */
object WriteHbaseTest {

  def readHbase(sc:SparkContext) = {
    val getTableName = "TestTable"
    val configuration = HBaseConfiguration.create()
    configuration.set(TableInputFormat.INPUT_TABLE, getTableName)  //设定表名

    val hBaseRDD = sc.newAPIHadoopRDD(configuration, classOf[TableInputFormat],
      classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
      classOf[org.apache.hadoop.hbase.client.Result])

    val resultRDD = hBaseRDD.map(tuple => tuple._2)

    val arrayRDD = resultRDD.map( res => res.raw() )    //将Result类型转换为Array类型

    val columnRDD = arrayRDD.map (
      res => res.map(
        arr => {
          val rowName = new String(arr.getRow);          //取出rowName,columnName,value
          (rowName, List(( new String (arr.getQualifier) , new String(arr.getValue))))
        }

      )
    )
    columnRDD.map(res => res.reduce{ (x,y) => (x._1,x._2++y._2)})
  }

  def writeHbase(t:RDD[(String, List[(String, String)])])={
    val putTableName = "TestTableOut"

    t.foreachPartition(
      iter => {
        val hbaseConf = HBaseConfiguration.create();
        val hBaseTable = new HTable(hbaseConf,putTableName);
        //val hBaseTable = new HTable(configuration,putTableName);
        iter.foreach(
          tuple => {
            val put = new Put(tuple._1.getBytes());
            val result = tuple._2;
            for( i <- result.indices){
              put.add("PM".getBytes(), result(i)._1.getBytes(), result(i)._2.getBytes())
            }
            hBaseTable.put(put)
          }
        )
      }
    )
  }

  def main(args: Array[String]) {

    val conf = new SparkConf().setAppName("WriteHbase")
    val sc = new SparkContext(conf)
    writeHbase(readHbase(sc))

    System.exit(0)
  }

}[/mw_shl_code]
回复

使用道具 举报

轩辕依梦Q 发表于 2016-3-1 18:15:00
mark 一下,多谢分享,如果说建一个连接池,然后从池里拿连接不知道是否可行呢
回复

使用道具 举报

chimes298 发表于 2016-3-3 16:51:54
轩辕依梦Q 发表于 2016-3-1 18:15
mark 一下,多谢分享,如果说建一个连接池,然后从池里拿连接不知道是否可行呢

感觉HBaseConfiguration.create()这个方法封装了建立的连接池,对用户不可见,所以好像不能直接从里面拿连接。
没有试过spark读hbase有没有其他的接口可以这么做。
回复

使用道具 举报

12下一页
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条