从kafka里面取数据到sparkStreaming里面,然后再把数据保存到数据中
关键点,不是每个数据都需要创建连接,只需要为每个分区创建一个连接就可以了
下面是一个简单的例子
从kafka里面取数据到sparkStreaming里面,然后再把数据保存到数据中
关键点,不是每个数据都需要创建连接,只需要为每个分区创建一个连接就可以了
下面是一个简单的例子
[mw_shl_code=scala,true]import java.sql.{ Connection, DriverManager }
import com.oracle._
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
import org.apache.spark.{ SparkConf, SparkContext }
/**
* 从kafka上面读取数据,然后保存到数据库上面,虽然一般不建议把数据保存到数据库中,(保存到数据库中的时候可以建立连接池)
* 如果是保存到hbase上面也可以使用这样的方法,为每个分区创建一个RDD连接,而不是为每个数据建立一个连接
*/
object StreamToOracle {
def main(args: Array[String]) {
val sparkConf = new SparkConf().setAppName("KafkaWordCount").setMaster("local[2]")
val sc = new SparkContext(sparkConf)
val ssc = new StreamingContext(sc, Seconds(10))
//ssc.checkpoint("checkpoint")
val topic = "test"
val topicMap = topic.split(",").map((_, 1)).toMap
val lines = KafkaUtils.createStream(ssc, "192.168.10.209:2181,192.168.10.219:2181,192.168.10.199:2181", "ssk", topicMap).map(_._2)
//每个RDD进行操作
lines.foreachRDD(rdd => {
rdd.foreachPartition(partitionOfRecords => {
//重点在这里,在每一个分区里面建立一个连接
val connection = getConnection()
partitionOfRecords.foreach(line => {
val info = line.split(":")
val ip = info(0)
val mesType = info(1)
val data = info(2)
val timeStamp = info(3)
val sql = "insert into MONITOR_DATA values(AUTO_INCREMENT.NEXTVAL,'" + ip + "','" + mesType + "','" + data + "'," + timeStamp + ")"
saveToOracle(connection, sql)
})
closeConn(connection)
})
val words = rdd.flatMap(_.split(":"))
})
ssc.start()
ssc.awaitTermination()
}
//保存数据
def saveToOracle(con: Connection, sql: String): Int = {
val ps = con.prepareStatement(sql);
val res = ps.executeUpdate()
ps.close()
res
}
//关闭连接
def closeConn(con: Connection) = {
con.close()
}
//得到连接
def getConnection(): Connection = {
val url = "jdbc:oracle:thin:@//192.168.10.100:1521/UCLOUD"
Class.forName("oracle.jdbc.driver.OracleDriver").newInstance();
val con = DriverManager.getConnection(url, "scott", "scott");
con
}
}[/mw_shl_code]
来自:韩利鹏
|
|