[mw_shl_code=bash,true]class CustomDataSourceProvider extends DataSourceRegister
with StreamSourceProvider
with Logging {
//Override some functions ……
}[/mw_shl_code]
var offset: Long = 0
if (start.isDefined) {
offset = offset2Map(start.get)(tableName)
}
val limit = offset2Map(end)(tableName) - offset
val sql = s"SELECT * FROM $tableName limit $limit offset $offset"
val st = conn.prepareStatement(sql)
val rs = st.executeQuery()
val rows: Iterator[InternalRow] = JdbcUtils.resultSetToSparkInternalRows(rs, schemaOption.get, inputMetrics) //todo 好用
val rdd = sqlContext.sparkContext.parallelize(rows.toSeq)
def rateLimit(limit: Long, currentOffset: Map[String, Long], latestOffset: Map[String, Long]): Map[String, Long] = {
val co = currentOffset(tableName)
val lo = latestOffset(tableName)
if (co + limit > lo) {
Map[String, Long](tableName -> lo)
} else {
Map[String, Long](tableName -> (co + limit))
}
}
// 获取最新条数
def getLatestOffset: Map[String, Long] = {
var offset: Long = 0
val sql = s"SELECT COUNT(1) FROM $tableName"
val st = conn.prepareStatement(sql)
val rs = st.executeQuery()
while (rs.next()) {
offset = rs.getLong(1)
}
Map[String, Long](tableName -> offset)
}
def offset2Map(offset: Offset): Map[String, Long] = {
implicit val formats: AnyRef with Formats = Serialization.formats(NoTypeHints)
Serialization.read[Map[String, Long]](offset.json())
}
}
case class MySQLSourceOffset(offset: Map[String, Long]) extends Offset {
implicit val formats: AnyRef with Formats = Serialization.formats(NoTypeHints)
[mw_shl_code=bash,true]class CustomDataSourceProvider extends DataSourceRegister
with StreamSourceProvider
with StreamSinkProvider
with Logging {
//Override some functions ……
}[/mw_shl_code]