本帖最后由 remarkzhao 于 2017-8-28 13:26 编辑
如何从sqlserver里把数据导入到hbase里,这个问题纠结了我一个月,各种方法的取舍,以及判断 还是有点迷糊,目前想尝试以下方法 还是有点问题 望
各位大神指教一下
方案: 从sqlserver里拉去dataframe,再保存为RDD,然后在用RDD导入hbase的api进行hbase的数据导入
目前遇到的问题:
1. scala代码编写之后 打成jar进行提交的时候出现下标越界异常
2. 打jar的时候有时候会遇到错误:找不到spark-parent_2.11-2.2.0.jar
在idea开发的时候错误是这样的:SBT project import: [error] Server access Error: Connection timed out: connect url=http://maven.ibiblio.org/maven2/org/apache/spark/spark-parent_2.11/2.2.0/spark-parent_2.11-2.2.0.jar,但是很奇怪,在linux下直接 sbt/sbt pacage 有时候就可以 有时候就不可以。
3. 有没有scala的算子直接把sqlserver里拉出的dataframe 保存为rdd 或者拉出来的就是RDD 不是dataframe ,我这个是先dataframe以保存rdd的形式到本地再用textFile拿出来。
代码如下,望各位大神指点:
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase._
import org.apache.hadoop.hbase.client._
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
object ImportToHBase{
def main(args: Array[String]){
val conf = HBaseConfiguration.create()
val sc = new SparkContext(new SparkConf())
val readFile = sc.textFile("file:///root/tools/spark/mycode/DC_EXAM_RESULT.txt").map(x => x.split(",")) //DC_EXAM_RESULT.txt 是我从sqlserver拉出来dataframe进行 .rdd.saveAsTexFile之后的目录,目录下的问part-00000是csv文件, 以逗号为分隔符
val tableName = "Document"
readFile.foreachPartition{
x=> {
val myConf = HBaseConfiguration.create()
myConf.set("hbase.zookeeper.quorum", "hadoop001,hadoop002,hadoop003")
myConf.set("hbase.zookeeper.property.clientPort", "2181")
myConf.set("hbase.defaults.for.version.skip", "true")
val myTable = new HTable(myConf, TableName.valueOf(tableName))
myTable.setAutoFlush(false, false)
myTable.setWriteBufferSize(5*1024*1024)
x.foreach { y => {
val p = new Put(Bytes.toBytes(y(3))) // csv的第4列为rowkey
p.add("Family".getBytes, "qualifier".getBytes, Bytes.toBytes(y(45))) //csv的第46列为列族的列的内容 ,只有一个列族,一个列。
myTable.put(p)
}
}
myTable.flushCommits()
}
}
}
}
以下是异常:
[Stage 0:> (0 + 1) / 36]15711 [Executor task launch worker for task 0] ERROR org.apache.spark.executor.Executor - Exception in task 0.0 in stage 0.0 (TID 0)
java.lang.ArrayIndexOutOfBoundsException: 45
at ImportToHBase$$anonfun$main$1$$anonfun$apply$1.apply(ImportToHBase.scala:30)
at ImportToHBase$$anonfun$main$1$$anonfun$apply$1.apply(ImportToHBase.scala:28)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at ImportToHBase$$anonfun$main$1.apply(ImportToHBase.scala:28)
at ImportToHBase$$anonfun$main$1.apply(ImportToHBase.scala:20)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
16114 [task-result-getter-0] ERROR org.apache.spark.scheduler.TaskSetManager - Task 0 in stage 0.0 failed 1 times; aborting job
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor driver): java.lang.ArrayIndexOutOfBoundsException: 45
at ImportToHBase$$anonfun$main$1$$anonfun$apply$1.apply(ImportToHBase.scala:30)
at ImportToHBase$$anonfun$main$1$$anonfun$apply$1.apply(ImportToHBase.scala:28)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at ImportToHBase$$anonfun$main$1.apply(ImportToHBase.scala:28)
at ImportToHBase$$anonfun$main$1.apply(ImportToHBase.scala:20)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1499)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1487)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1486)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1486)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:814)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1714)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1669)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1658)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2022)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2043)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2062)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2087)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:926)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:924)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:924)
at ImportToHBase$.main(ImportToHBase.scala:19)
at ImportToHBase.main(ImportToHBase.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:755)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:119)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.ArrayIndexOutOfBoundsException: 45
at ImportToHBase$$anonfun$main$1$$anonfun$apply$1.apply(ImportToHBase.scala:30)
at ImportToHBase$$anonfun$main$1$$anonfun$apply$1.apply(ImportToHBase.scala:28)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at ImportToHBase$$anonfun$main$1.apply(ImportToHBase.scala:28)
at ImportToHBase$$anonfun$main$1.apply(ImportToHBase.scala:20)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
16608 [Thread-2] INFO org.spark_project.jetty.server.AbstractConnector - Stopped Spark@431a7c26{HTTP/1.1,[http/1.1]}{0.0.0.0:4041}
|
|