本帖最后由 howtodown 于 2014-11-4 14:52 编辑
问题导读
1.在Spark中提供了一个JdbcRDD类,该RDD就是读取JDBC中的数据并转换成RDD?
2.JdbcRDD类构造函数的各个参数的含义你认为都是什么?
在Spark中提供了一个JdbcRDD类,该RDD就是读取JDBC中的数据并转换成RDD,之后我们就可以对该RDD进行各种的操作。我们先看看该类的构造函数:
JdbcRDD[T: ClassTag](
sc: SparkContext,
getConnection: () => Connection,
sql: String,
lowerBound: Long,
upperBound: Long,
numPartitions: Int,
mapRow: (ResultSet) => T = JdbcRDD.resultSetToObjectArray _) 复制代码
这个类带了很多参数,关于这个函数的各个参数的含义,
1、getConnection 返回一个已经打开的结构化数据库连接,JdbcRDD会自动维护关闭。
2、sql 是查询语句,此查询语句必须包含两处占位符?来作为分割数据库ResulSet的参数,例如:”select title, author from books where ? < = id and id <= ?”
3、lowerBound, upperBound, numPartitions 分别为第一、第二占位符,partition的个数。例如,给出lowebound 1,upperbound 20, numpartitions 2,则查询分别为(1, 10)与(11, 20)
4、mapRow 是转换函数,将返回的ResultSet转成RDD需用的单行数据,此处可以选择Array或其他,也可以是自定义的case class。默认的是将ResultSet 转换成一个Object数组。
下面我们说明如何使用该类。
package scala
import java.sql.DriverManager
import org.apache.spark.SparkContext
import org.apache.spark.rdd.JdbcRDD
object SparkToJDBC {
def main(args: Array[String]) {
val sc = new SparkContext("local", "mysql")
val rdd = new JdbcRDD(
sc,
() => {
Class.forName("com.mysql.jdbc.Driver").newInstance()
DriverManager.getConnection("jdbc:mysql://localhost:3306/db", "root", "123456")
},
"SELECT content FROM mysqltest WHERE ID >= ? AND ID <= ?",
1, 100, 3,
r => r.getString(1)).cache()
print(rdd.filter(_.contains("success")).count())
sc.stop()
}
} 复制代码
代码比较简短,主要是读mysqltest 表中的数据,并统计ID>=1 && ID < = 100 && content.contains("success")的记录条数。我们从代码中可以看出JdbcRDD的sql参数要带有两个?的占位符,而这两个占位符是给参数lowerBound和参数upperBound定义where语句的上下边界的。从JdbcRDD类的构造函数可以知道,参数lowerBound和参数upperBound都只能是Long类型的,并不支持其他类型的比较,这个使得JdbcRDD使用场景比较有限。而且在使用过程中sql参数必须有类似 ID >= ? AND ID < = ?这样的where语句,如果你写成下面的形式:
val rdd = new JdbcRDD(
sc,
() => {
Class.forName("com.mysql.jdbc.Driver").newInstance()
DriverManager.getConnection("jdbc:mysql://localhost:3306/db", "root", "123456")
},
"SELECT content FROM mysqltest",
1, 100, 3,
r => r.getString(1)).cache() 复制代码
那不好意思,运行的时候会出现以下的错误:
2014-09-10 15:47:45,621 (Executor task launch worker-0) [ERROR -
org.apache.spark.Logging$class.logError(Logging.scala:95)] Exception in task ID 1
java.sql.SQLException: Parameter index out of range (1 > number of parameters, which is 0).
at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:1074)
at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:988)
at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:974)
at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:919)
at com.mysql.jdbc.PreparedStatement.checkBounds(PreparedStatement.java:3813)
at com.mysql.jdbc.PreparedStatement.setInternal(PreparedStatement.java:3795)
at com.mysql.jdbc.PreparedStatement.setInternal(PreparedStatement.java:3840)
at com.mysql.jdbc.PreparedStatement.setLong(PreparedStatement.java:3857)
at org.apache.spark.rdd.JdbcRDD$anon$1.<init>(JdbcRDD.scala:84)
at org.apache.spark.rdd.JdbcRDD.compute(JdbcRDD.scala:70)
at org.apache.spark.rdd.JdbcRDD.compute(JdbcRDD.scala:50)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:77)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:227)
at org.apache.spark.rdd.FilteredRDD.compute(FilteredRDD.scala:34)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)
at org.apache.spark.scheduler.Task.run(Task.scala:51)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:619) 复制代码
看下JdbcRDD类的compute函数实现就知道了:
override def compute(thePart: Partition, context: TaskContext) = new NextIterator[T] {
context.addOnCompleteCallback{ () => closeIfNeeded() }
val part = thePart.asInstanceOf[JdbcPartition]
val conn = getConnection()
val stmt = conn.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY,
ResultSet.CONCUR_READ_ONLY)
if (conn.getMetaData.getURL.matches("jdbc:mysql:.*")) {
stmt.setFetchSize(Integer.MIN_VALUE)
logInfo("statement fetch size set to: " + stmt.getFetchSize +
" to force mySQL streaming ")
}
stmt.setLong(1, part.lower)
stmt.setLong(2, part.upper)
........................... 复制代码
不过值得高兴的是,我们可以自定义一个JdbcRDD,修改上面的计算思路,这样就可以得到符合我们自己要求的JdbcRDD。
对于JAVA例子,JdbcRDD类的最后一个参数很不好传,很多人都没有实现
http://www.songyafei.cn/post/a0d5b_26775a3