分享

【求助】Spark如果在写入多个HBase表中

唐运 发表于 2015-3-22 23:08:04 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 11 85191
【求助】Spark如果在写入多个HBase表中,最近项目需要用到Spark Streaming,在研究写入HBase时,单表OK。
但是写入多表没有成功,请有做过的朋友帮忙指点下。万分感谢

已有(11)人评论

跳转到指定楼层
langke93 发表于 2015-3-22 23:30:27
贴出代码和错误看看
回复

使用道具 举报

desehawk 发表于 2015-3-22 23:36:17
本帖最后由 desehawk 于 2015-3-22 23:45 编辑

单表能成功,多表应该也没有问题,是不是因为使用的参数或则变量等程序内部造成的。
回复

使用道具 举报

唐运 发表于 2015-3-23 00:41:25
本帖最后由 唐运 于 2015-3-23 00:43 编辑
  1. package com.stream
  2. import org.apache.spark.streaming.StreamingContext
  3. import org.apache.spark.streaming.Seconds
  4. import org.apache.spark.streaming.kafka.KafkaUtils
  5. import kafka.serializer.StringDecoder
  6. import com.utils.HBaseUtils
  7. import org.apache.hadoop.hbase.HBaseConfiguration
  8. import org.apache.hadoop.mapred.JobConf
  9. import org.apache.hadoop.hbase.mapred.TableOutputFormat
  10. import org.apache.spark.rdd.RDD
  11. import org.apache.spark.rdd.PairRDDFunctions
  12. import com.bean.LogFormatBean
  13. import scala.collection.mutable.Map
  14. import org.apache.hadoop.hbase.client.Put
  15. import org.apache.hadoop.hbase.io.ImmutableBytesWritable
  16. import org.apache.hadoop.hbase.util.Bytes
  17. import org.apache.hadoop.hbase.mapreduce.MultiTableOutputFormat
  18. //import kafka.serializer.StringDecoder
  19. //import org.apache.spark.streaming._
  20. //import org.apache.spark.streaming.kafka._
  21. //import org.apache.hadoop.mapred.JobConf
  22. //import org.apache.spark.rdd.PairRDDFunctions
  23. //import org.apache.hadoop.hbase.util.Bytes
  24. object KafkaToHBase {
  25.   def main(args: Array[String]): Unit = {
  26.     if (args.length < 6) {
  27.       System.err.println("Usage: KafkaToHBase <master> <zkQuorum> <group> <topics> <HBaseTableName> <brokers>")
  28.       System.exit(1)
  29.     }
  30.     //提取参数
  31.     val Array(master, zkQuorum, group, topics, tableName, brokers) = args
  32.    
  33.      //设置批处理间隔2秒
  34.     val ssc = new StreamingContext(master, "SparkImportHBase", Seconds(1))
  35.     //提取topic、brokers参数
  36.     val topicsSet = topics.split(",").toSet//多topic同时接收,如果直写一个topic就只接收一个topic的数据
  37.     val kafkaParams = scala.collection.immutable.Map[String, String]("metadata.broker.list" -> brokers)
  38.     //接收Kafka传递的数据
  39.     val kafkaDstreams = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)
  40.     //遍历rdd
  41.     val lines = kafkaDstreams.foreachRDD{rdd=>
  42.       //提rdd中的取值
  43.       val dataSet = rdd.map(_._2)
  44.       saveGameLogToHBase(dataSet, zkQuorum, tableName)
  45.     }
  46.     kafkaDstreams.map(_._2).count().print()
  47.    
  48.     ssc.start()
  49.     ssc.awaitTermination()
  50.   }
  51.   
  52.   
  53.   /**
  54.    * 游戏原始日志保存至HBase
  55.    */
  56.    def saveGameLogToHBase(rdd: RDD[String], zkQuorum: String, tableName: String): Unit = {
  57.                   val conf = HBaseConfiguration.create()
  58.                  //设置zookeeper地址
  59.      conf.set("hbase.zookeeper.quorum", zkQuorum)
  60.       //设置表名,如果不是原始日志
  61.     val columnAndvalueMap = Map[String, String]()
  62.     new PairRDDFunctions(rdd.map { line =>
  63.       //生成rowkey
  64.         //val rowkey = HBaseUtils.createRowKey(line, tableName)
  65.       val rowkey = HBaseUtils.buildUUID
  66.         //val tablename = "n2_outer_"+line.split("#")(0)+"_log"
  67.         //conf.set(TableOutputFormat.OUTPUT_TABLE,tableName)
  68.         gameLogToMap("#", line, columnAndvalueMap)
  69.         val tablebname = line.split("#")(0)
  70.       createHBaseRecord(LogFormatBean.CF, rowkey, columnAndvalueMap,tablebname)
  71. //    }).saveAsHadoopDataset(jobConfig)
  72.   }).saveAsNewAPIHadoopFile("",classOf[String] , classOf[String], classOf[MultiTableOutputFormat], conf)
  73.   }
  74.    
  75.      def gameLogToMap(regex: String, line: String, mp: Map[String, String]) = {
  76.     val s_lines = line.split(regex)
  77.     val length: Int = s_lines.length
  78.     val mp = Map[String, String]()
  79.     //循环遍历参数
  80.     for (index <- 1 to length) {
  81.       mp.put("c" + index, s_lines(index - 1))
  82.     }
  83.     mp
  84.   }
  85.      
  86.      
  87.      def createHBaseRecord(colfimaly: String, rowKey: String, values: Map[String, String],tablebname:String): (ImmutableBytesWritable, Put) = {
  88.     //创建一个put
  89.     val record = new Put(Bytes.toBytes(rowKey))
  90.     //遍历添加参数
  91.     val it = values.iterator
  92.     while (it.hasNext) {
  93.       val vls = it.next()
  94.       val colname = vls._1.toString
  95.       println("colname============>"+colname)
  96.       val colvalue = vls._2.toString
  97.       println("colvalue============>"+colvalue)
  98.       if(!colname.eq("") && !colvalue.eq(""))
  99.         record.add(Bytes.toBytes(colfimaly), Bytes.toBytes(colname), Bytes.toBytes(colvalue))
  100.       else
  101.          record.add(Bytes.toBytes(colfimaly), Bytes.toBytes("c1"), Bytes.toBytes("valueIsNull"))
  102.     }
  103.     //返回put
  104.     (new ImmutableBytesWritable(Bytes.toBytes(tablebname)), record)
  105.   }
  106.      
  107. }
复制代码
回复

使用道具 举报

唐运 发表于 2015-3-23 00:42:30
单表是按照官网实例来的,这个是根据api和源码之后写的,现在报错
  1. org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in stage 290.0 failed 4 times, most recent failure: Lost task 3.3 in stage 290.0 (TID 1115, dn14.9961.tj): java.lang.IllegalArgumentException: No columns to insert
  2.         at org.apache.hadoop.hbase.client.HTable.validatePut(HTable.java:1007)
  3.         at org.apache.hadoop.hbase.client.HTable.doPut(HTable.java:800)
  4.         at org.apache.hadoop.hbase.client.HTable.put(HTable.java:786)
  5.         at org.apache.hadoop.hbase.mapreduce.MultiTableOutputFormat$MultiTableRecordWriter.write(MultiTableOutputFormat.java:132)
  6.         at org.apache.hadoop.hbase.mapreduce.MultiTableOutputFormat$MultiTableRecordWriter.write(MultiTableOutputFormat.java:68)
  7.         at org.apache.spark.rdd.PairRDDFunctions$anonfun$12.apply(PairRDDFunctions.scala:1000)
  8.         at org.apache.spark.rdd.PairRDDFunctions$anonfun$12.apply(PairRDDFunctions.scala:979)
  9.         at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
  10.         at org.apache.spark.scheduler.Task.run(Task.scala:64)
  11.         at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
  12.         at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
  13.         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
  14.         at java.lang.Thread.run(Thread.java:662)
  15. Driver stacktrace:
  16.         at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$failJobAndIndependentStages(DAGScheduler.scala:1203)
  17.         at org.apache.spark.scheduler.DAGScheduler$anonfun$abortStage$1.apply(DAGScheduler.scala:1192)
  18.         at org.apache.spark.scheduler.DAGScheduler$anonfun$abortStage$1.apply(DAGScheduler.scala:1191)
  19.         at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
  20.         at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
  21.         at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1191)
  22.         at org.apache.spark.scheduler.DAGScheduler$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
  23.         at org.apache.spark.scheduler.DAGScheduler$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
  24.         at scala.Option.foreach(Option.scala:236)
  25.         at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693)
  26.         at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393)
  27.         at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354)
复制代码
回复

使用道具 举报

arsenduan 发表于 2015-3-23 01:01:05

问题可能在这里了,看看是不是循环或则边界造成的。先一一打印,colname很有可能因为边界造成找不到。
while (it.hasNext) {
      val vls = it.next()
      val colname = vls._1.toString
      println("colname============>"+colname)
      val colvalue = vls._2.toString
      println("colvalue============>"+colvalue)
      if(!colname.eq("") && !colvalue.eq(""))
       // record.add(Bytes.toBytes(colfimaly), Bytes.toBytes(colname), Bytes.toBytes(colvalue))
      else
         //record.add(Bytes.toBytes(colfimaly), Bytes.toBytes("c1"), Bytes.toBytes("valueIsNull"))
    }

回复

使用道具 举报

唐运 发表于 2015-3-23 01:01:27
今天真是晕倒家了,烦了超低级错误
正确代码如下::
  1. package com.stream
  2. import org.apache.spark.streaming.StreamingContext
  3. import org.apache.spark.streaming.Seconds
  4. import org.apache.spark.streaming.kafka.KafkaUtils
  5. import kafka.serializer.StringDecoder
  6. import com.utils.HBaseUtils
  7. import org.apache.hadoop.hbase.HBaseConfiguration
  8. import org.apache.hadoop.mapred.JobConf
  9. import org.apache.hadoop.hbase.mapred.TableOutputFormat
  10. import org.apache.spark.rdd.RDD
  11. import org.apache.spark.rdd.PairRDDFunctions
  12. import com.bean.LogFormatBean
  13. import scala.collection.mutable.Map
  14. import org.apache.hadoop.hbase.client.Put
  15. import org.apache.hadoop.hbase.io.ImmutableBytesWritable
  16. import org.apache.hadoop.hbase.util.Bytes
  17. import org.apache.hadoop.hbase.mapreduce.MultiTableOutputFormat
  18. //import kafka.serializer.StringDecoder
  19. //import org.apache.spark.streaming._
  20. //import org.apache.spark.streaming.kafka._
  21. //import org.apache.hadoop.mapred.JobConf
  22. //import org.apache.spark.rdd.PairRDDFunctions
  23. //import org.apache.hadoop.hbase.util.Bytes
  24. object KafkaToHBase {
  25.   def main(args: Array[String]): Unit = {
  26.     if (args.length < 6) {
  27.       System.err.println("Usage: KafkaToHBase <master> <zkQuorum> <group> <topics> <HBaseTableName> <brokers>")
  28.       System.exit(1)
  29.     }
  30.     //提取参数
  31.     val Array(master, zkQuorum, group, topics, tableName, brokers) = args
  32.    
  33.      //设置批处理间隔2秒
  34.     val ssc = new StreamingContext(master, "SparkImportHBase", Seconds(1))
  35.     //提取topic、brokers参数
  36.     val topicsSet = topics.split(",").toSet//多topic同时接收,如果直写一个topic就只接收一个topic的数据
  37.     val kafkaParams = scala.collection.immutable.Map[String, String]("metadata.broker.list" -> brokers)
  38.     //接收Kafka传递的数据
  39.     val kafkaDstreams = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)
  40.     //遍历rdd
  41.     val lines = kafkaDstreams.foreachRDD{rdd=>
  42.       //提rdd中的取值
  43.       val dataSet = rdd.map(_._2)
  44.       saveGameLogToHBase(dataSet, zkQuorum, tableName)
  45.     }
  46.     kafkaDstreams.map(_._2).count().print()
  47.    
  48.     ssc.start()
  49.     ssc.awaitTermination()
  50.   }
  51.   
  52.   
  53.   /**
  54.    * 游戏原始日志保存至HBase
  55.    */
  56.    def saveGameLogToHBase(rdd: RDD[String], zkQuorum: String, tableName: String): Unit = {
  57.       val conf = HBaseConfiguration.create()
  58.      //设置zookeeper地址
  59.      conf.set("hbase.zookeeper.quorum", zkQuorum)
  60.       //设置表名,如果不是原始日志
  61.     new PairRDDFunctions(rdd.map { line =>
  62.       //生成rowkey
  63.         //val rowkey = HBaseUtils.createRowKey(line, tableName)
  64.       val rowkey = HBaseUtils.buildUUID
  65.         //val tablename = "n2_outer_"+line.split("#")(0)+"_log"
  66.         //conf.set(TableOutputFormat.OUTPUT_TABLE,tableName)
  67.         val columnAndvalueMap = gameLogToMap("#", line)
  68.         val tablebname = line.split("#")(0)
  69.       createHBaseRecord(LogFormatBean.CF, rowkey, columnAndvalueMap,tablebname)
  70. //    }).saveAsHadoopDataset(jobConfig)
  71.   }).saveAsNewAPIHadoopFile("",classOf[String] , classOf[String], classOf[MultiTableOutputFormat], conf)
  72.   }
  73.    
  74.      def gameLogToMap(regex: String, line: String) = {
  75.     val s_lines = line.split(regex)
  76.     val length: Int = s_lines.length
  77.     val mp = Map[String, String]()
  78.     //循环遍历参数
  79.     for (index <- 1 to length) {
  80.       mp.put("c" + index, s_lines(index - 1))
  81.     }
  82.     mp
  83.   }
  84.      
  85.      
  86.      def createHBaseRecord(colfimaly: String, rowKey: String, values: Map[String, String],tablebname:String): (ImmutableBytesWritable, Put) = {
  87.     //创建一个put
  88.     val record = new Put(Bytes.toBytes(rowKey))
  89.     //遍历添加参数
  90.     val it = values.iterator
  91.     while (it.hasNext) {
  92.       val vls = it.next()
  93.       val colname = vls._1.toString
  94.       println("colname============>"+colname)
  95.       val colvalue = vls._2.toString
  96.       println("colvalue============>"+colvalue)
  97.       record.add(Bytes.toBytes(colfimaly), Bytes.toBytes("c1"), Bytes.toBytes("valueIsNull"))
  98.     }
  99.     //返回put
  100.     (new ImmutableBytesWritable(Bytes.toBytes(tablebname)), record)
  101.   }
  102.      
  103. }
复制代码
回复

使用道具 举报

arsenduan 发表于 2015-3-23 01:03:01
本帖最后由 arsenduan 于 2015-3-23 01:04 编辑
唐运 发表于 2015-3-23 00:42
单表是按照官网实例来的,这个是根据api和源码之后写的,现在报错

也不能只看columns ,还需要看看是否有空值,可能put里面 有空值了
查看
HBase-No columns to insert
回复

使用道具 举报

唐运 发表于 2015-3-23 08:12:50
arsenduan 发表于 2015-3-23 01:03
也不能只看columns ,还需要看看是否有空值,可能put里面 有空值了
查看
HBase-No columns to insert

就是因为空值,一个方法有返回值,我没有接收,写hbase时用的Map是个空的。

回复

使用道具 举报

落魂草 发表于 2015-3-23 19:43:54
回复

使用道具 举报

12下一页
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条