本帖最后由 J20_果农 于 2017-5-11 16:54 编辑
Exception in thread "main" org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2094)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.RDD.foreach(RDD.scala:915)
at org.apache.spark.sql.Dataset$$anonfun$foreach$1.apply$mcV$sp(Dataset.scala:2286)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2765)
at org.apache.spark.sql.Dataset.foreach(Dataset.scala:2285)
at com.StepExp$.main(StepExp.scala:79)
at com.StepExp.main(StepExp.scala)
Caused by: java.io.NotSerializableException: oracle.jdbc.driver.OraclePreparedStatementWrapper
Serialization stack:
- object not serializable (class: oracle.jdbc.driver.OraclePreparedStatementWrapper, value: oracle.jdbc.driver.OraclePreparedStatementWrapper@cdbe995)
- field (class: com.StepExp$$anonfun$main$1, name: stmt$1, type: interface java.sql.PreparedStatement)
- object (class com.StepExp$$anonfun$main$1, <function1>)
以上报错提示为序列化对象,但是我在我的Object上已经添加了extends Serializable 还是没起作用奇怪的问题,我删除语句可以正常执行,插入语句就报错。
请教大家,怎么解决这个问题?谢谢!
[mw_shl_code=java,true] df.createTempView("fact_abc")
val sqlDF = sc.sql("select stat_date, data_type, status, login_num, rds_search_num, " +
" rds_send_search_num, rds_browse_num, " +
" rds_send_browse_num, rds_download_resume_num, " +
" rds_exp_resume_num, search_num, send_search_num " +
" browse_num from fact_abc ")
val ds = new OracleDataSource()
ds.setUser("ab")
ds.setPassword("123")
ds.setURL("jdbc:oracle:thin:@192.168.1.1:1521:abc")
val con = ds.getConnection
println("oracle data source Connected !!")
val startMs = System.currentTimeMillis()
val detete = "delete from fact_abc where stat_date = to_date('"+dt+"', 'YYYYMMDD')"
val stmt0 = con.createStatement()
stmt0.execute(detete)
val sql = "insert into fact_ras_base (stat_date, data_type, status, login_num, rds_search_num, "+
" rds_send_search_num, rds_browse_num, rds_send_browse_num, rds_download_resume_num, "+
" rds_exp_resume_num, search_num, send_search_num, browse_num ) " +
"values(to_date(:1, 'yyyymmdd'),:2,:3,:4,:5,:6,:7,:8,:9,:10,:11,:12,:13)"
val stmt = con.prepareStatement(sql)
var rowCount = 0
for(row <- sqlDF ){
println(row.getString(0)+ ", " + row.getString(3))
stmt.setString(1, row.getString(0))
stmt.setString(2, row.getString(1))
stmt.setString(3, row.getString(2))
stmt.setInt(4, row.getInt(3))
stmt.setInt(5, row.getInt(4))
stmt.setInt(6, row.getInt(5))
stmt.setInt(7, row.getInt(6))
stmt.setInt(8, row.getInt(7))
stmt.setInt(9, row.getInt(8))
stmt.setInt(10, row.getInt(9))
stmt.setInt(10, row.getInt(10))
stmt.setInt(10, row.getInt(11))
stmt.setInt(10, row.getInt(12))
rowCount += stmt.executeUpdate()
}
val elapMs = System.currentTimeMillis() - startMs
println(rowCount + "rows inserted - " + elapMs)
con.commit()
stmt0.close()
stmt.close()
con.close()
sc.stop()
}[/mw_shl_code]
|
|