package cn.chinahadoop.streaming
import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
import org.apache.hadoop.hbase.client.{ConnectionFactory, Put}
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* Chen Chao
*/
object HdfsWordCount {
def main(args: Array[String]) {
if (args.length < 3) {
System.err.println("Usage: HdfsWordCount <master> <directory> <seconds>")
System.exit(1)
}
StreamingExamples.setStreamingLogLevels()
val sparkConf = new SparkConf().setAppName("HdfsWordCount").setMaster(args(0))
val sc = new SparkContext(sparkConf)
//新建StreamingContext
val ssc=new StreamingContext(sc,Seconds(args(2).toInt))
ssc.checkpoint(".")
val updateFunc = (values: Seq[Int], state: Option[Int]) => {
val currentCount = values.foldLeft(0)(_ + _)
val previousCount = state.getOrElse(0)
Some(currentCount + previousCount)
}
//创建FileInputDStream,并指向特定目录
val lines = ssc.textFileStream(args(1))
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
//.updateStateByKey[Int](updateFunc)
//wordCounts.print()
//数据保存到hbase
wordCounts.foreachRDD(rdd => {
println("=============save hbase==================")
println("rdd : " + rdd.first())
println("rdd count : " + rdd.count())
if(!rdd.isEmpty()){
rdd.foreachPartition(partitionRecodes => {
val configuration = HBaseConfiguration.create()
configuration.set("hbase.zookeeper.property.clientPort", "2181")
configuration.set("hbase.zookeeper.quorum", "192.168.xxx.xxx")
configuration.set("hbase.master", "192.168.xxx.xxx:60000")
val connection = ConnectionFactory.createConnection(configuration)
val tableName = TableName.valueOf("tableName")
val table = connection.getTable(tableName)
partitionRecodes.foreach(data => {
println("data : " + data)
try {
val put = new Put(Bytes.toBytes(String.valueOf(System.currentTimeMillis()).reverse+data._1))
put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("word"), Bytes.toBytes(data._1))
put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("count"), Bytes.toBytes(data._2))
table.put(put)
} catch {
case e :Exception => e.printStackTrace()
} finally {
table.close()
}
})
})
}
})
ssc.start()
ssc.awaitTermination()
}
}
各位大神,我执行上述代码时,无法将数据插入到hbase中的tableName表中,但是数据是有的。
是代码有问题,还是其他的问题,还请大神多多指教。
日志
|
|