import com.chanct.idap.ssm.common.{HbaseUtils, PlatformConfig}
import org.apache.hadoop.hbase.TableName
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* Created by lichao on 16-4-4.
*
*/
object KafKa2SparkStreaming2Hbase {
def main(args: Array[String]) {
val zkQuorum = "c1d8:2181,c1d9:2181,c1d10:2181"
val group = "1"
val topics = "scala_api_topic"
val numThreads = 2
val sparkConf = new SparkConf().setAppName("KafkaWordCount2Hbase").setMaster("local[2]").set("spark.eventLog.overwrite","true")
// val sparkConf = new SparkConf().setAppName("KafkaWordCount2Hbase").set("spark.eventLog.overwrite","true")
val ssc = new StreamingContext(sparkConf, Seconds(5))
ssc.checkpoint("checkpoint")
val topicpMap = topics.split(",").map((_, numThreads.toInt)).toMap
val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicpMap).map(_._2)
val words = lines.flatMap(_.split(" "))
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
wordCounts.print()
// val store = wordCounts.foreachRDD(rdd => rdd.foreach(KafKa2SparkStreaming2Hbase.blah))
wordCounts.foreachRDD {
rdd => rdd.foreachPartition {
partition =>
// println("\n\n\n\n=======================================\n\n\n\n")
val conf = PlatformConfig.loadHbaseConf()
val conn = HbaseUtils.getConnection(conf)
val userTable = TableName.valueOf("WCTest")
val table = conn.getTable(userTable)
// println("\n\n\n\n-----------------------\n\n\n\n"+conf+"==>"+table.getName)
partition.foreach {
w =>
try {
val put = new Put(Bytes.toBytes(System.currentTimeMillis().toString))
put.addColumn(Bytes.toBytes("A"), Bytes.toBytes("value"), Bytes.toBytes(w._1.toString))
put.addColumn(Bytes.toBytes("A"), Bytes.toBytes("count"), Bytes.toBytes(w._2.toString))
table.put(put)
} catch {
case _: Exception => println("raw error!")
}
}
table.close()
conn.close()
}
}
wordCounts.print()
ssc.start()
ssc.awaitTermination()
}
}
|