分享

spark DataFrame 通过JDBC把数据插入oracle 报错!

J20_果农 发表于 2017-5-11 16:40:43 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 3 13826
本帖最后由 J20_果农 于 2017-5-11 16:54 编辑

Exception in thread "main" org.apache.spark.SparkException: Task not serializable
        at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
        at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
        at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
        at org.apache.spark.SparkContext.clean(SparkContext.scala:2094)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
        at org.apache.spark.rdd.RDD.foreach(RDD.scala:915)
        at org.apache.spark.sql.Dataset$$anonfun$foreach$1.apply$mcV$sp(Dataset.scala:2286)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
        at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2765)
        at org.apache.spark.sql.Dataset.foreach(Dataset.scala:2285)
        at com.StepExp$.main(StepExp.scala:79)
        at com.StepExp.main(StepExp.scala)
Caused by: java.io.NotSerializableException: oracle.jdbc.driver.OraclePreparedStatementWrapper
Serialization stack:
        - object not serializable (class: oracle.jdbc.driver.OraclePreparedStatementWrapper, value: oracle.jdbc.driver.OraclePreparedStatementWrapper@cdbe995)

        - field (class: com.StepExp$$anonfun$main$1, name: stmt$1, type: interface java.sql.PreparedStatement)
        - object (class com.StepExp$$anonfun$main$1, <function1>)

以上报错提示为序列化对象,但是我在我的Object上已经添加了extends Serializable 还是没起作用奇怪的问题,我删除语句可以正常执行,插入语句就报错。
请教大家,怎么解决这个问题?谢谢!

[mw_shl_code=java,true]    df.createTempView("fact_abc")
    val sqlDF = sc.sql("select stat_date, data_type, status, login_num, rds_search_num, " +
                       " rds_send_search_num, rds_browse_num, " +
                       " rds_send_browse_num, rds_download_resume_num, " +
                       " rds_exp_resume_num, search_num, send_search_num " +
                       " browse_num from fact_abc ")

    val ds = new OracleDataSource()
    ds.setUser("ab")
    ds.setPassword("123")
    ds.setURL("jdbc:oracle:thin:@192.168.1.1:1521:abc")
    val con = ds.getConnection
    println("oracle data source Connected !!")
    val startMs = System.currentTimeMillis()
    val detete = "delete from fact_abc where stat_date = to_date('"+dt+"', 'YYYYMMDD')"
    val stmt0 = con.createStatement()
    stmt0.execute(detete)
   
    val sql = "insert into fact_ras_base (stat_date, data_type, status, login_num, rds_search_num, "+
                                        " rds_send_search_num, rds_browse_num, rds_send_browse_num, rds_download_resume_num, "+
                                        " rds_exp_resume_num, search_num, send_search_num, browse_num ) " +
                                        "values(to_date(:1, 'yyyymmdd'),:2,:3,:4,:5,:6,:7,:8,:9,:10,:11,:12,:13)"
   
val stmt = con.prepareStatement(sql)

    var rowCount = 0
    for(row <- sqlDF ){
      println(row.getString(0)+ ", " + row.getString(3))

stmt.setString(1, row.getString(0))
      stmt.setString(2, row.getString(1))
      stmt.setString(3, row.getString(2))
      stmt.setInt(4, row.getInt(3))
      stmt.setInt(5, row.getInt(4))
      stmt.setInt(6, row.getInt(5))
      stmt.setInt(7, row.getInt(6))
      stmt.setInt(8, row.getInt(7))
      stmt.setInt(9, row.getInt(8))
      stmt.setInt(10, row.getInt(9))
      stmt.setInt(10, row.getInt(10))
      stmt.setInt(10, row.getInt(11))
      stmt.setInt(10, row.getInt(12))
      rowCount += stmt.executeUpdate()
    }
    val elapMs = System.currentTimeMillis() - startMs
    println(rowCount + "rows inserted - " + elapMs)
    con.commit()
    stmt0.close()
    stmt.close()
    con.close()
    sc.stop()
  }[/mw_shl_code]


已有(3)人评论

跳转到指定楼层
qcbb001 发表于 2017-5-11 17:55:29
prepareStatement的问题。采用下面方式试试
Statement stmt = con.createStatement();

StringBuffer sql = new StringBuffer("insert into user values (" + list.get(1).getId()+", "+list.get(1).getUserName()+", "+list.get(1).getPassword()+")");
for(int i = 2; i < list.size(); i++){
    sql.append(", ("+ list.get(i).getId()+", "+list.get(i).getUserName()+", "+list.get(i).getPassword()+")");//
}
sql.append(";");

  ResultSet rs = stmt.executeQuery(sql);

回复

使用道具 举报

J20_果农 发表于 2017-5-11 18:31:56
qcbb001 发表于 2017-5-11 17:55
prepareStatement的问题。采用下面方式试试
Statement stmt = con.createStatement();

这种方式试过了,也是同样的问题。
大家平时没用过吗?怎么用?
回复

使用道具 举报

J20_果农 发表于 2017-5-11 19:21:19
已解决, 还真是使用prepareStatement 的问题,改成 con.createStatement();  就没问题了。谢谢qcbb001
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条