wangweislk 发表于 2015-11-18 12:13:33

SparkStreaming写入Hbase遇到包问题,跪求各位大神帮忙

在使用SparkStreaming写入Hbase时,出现下面的错误。



提交命令:
spark-submit\
--master yarn-client \
--driver-memory 10g \
--executor-memory 20g \
--num-executors 10 \
--executor-cores 6 \
--jars /data1/bbdhadoop/bbdhadoop/jars/stanford-corenlp-3.4.1.jar,/data1/bbdhadoop/bbdhadoop/jars/spark-streaming-kafka_2.10-1.3.0.jar,/opt/cloudera/parcels/CDH-5.4.2-1.cdh5.4.2.p0.2/jars/htrace-core-3.1.0-incubating.jar \
--class com.bbd.test.Test /data1/bbdhadoop/wangwei/glf/testhbase2.jar c2namenode3 test_kafka_streaming test 1 \



mapToPair.foreach(new Function2<JavaPairRDD<String, Integer>, Time, Void>() {
                        @Override
                        public Void call(JavaPairRDD<String, Integer> values,Time time) throws Exception {

                              values.foreach(new VoidFunction<Tuple2<String, Integer>>() {
                                        @Override
                                        public void call(Tuple2<String, Integer> tuple)
                                                      throws Exception {
                                                Configuration conf = HBaseConfiguration.create();
                                                conf.set("hbase.zookeeper.quorum", "c2namenode3,c2namenode2,c2namenode1");
                                                HTable table = new HTable(conf, "hbase_test".getBytes());
//                                                HTableInterface table = hTablePool.getTable("hbase_test");
                                                Put put = new Put(tuple._1().getBytes());
                                                put.add("cf".getBytes(), "val".getBytes(), (tuple._2+"").getBytes());
                                                table.put(put);
                                                table.close();
                                        }
                              });

                              return null;
                        }
                });
                mapToPair.print();
               
                jssc.start();
                jssc.awaitTermination();



bioger_hit 发表于 2015-11-18 13:40:20

楼主

spark-submit\
--master yarn-client \
--driver-memory 10g \
--executor-memory 20g \
--num-executors 10 \
--executor-cores 6 \

master yarn-client而不是yarn-cluster?


对hbase操作,我一般用下面代码
                conf = HBaseConfiguration.create();
                conf.set("hbase.zookeeper.quorum", "master");// 使用eclipse时必须添加这个,否则无法定位master需要配置hosts
                conf.set("hbase.zookeeper.property.clientPort", "2181");

wangweislk 发表于 2015-11-18 13:59:07

bioger_hit 发表于 2015-11-18 13:40
楼主

spark-submit\


yarn-client和yarn-cluster我都试过的。对于加端口,我再直接测试hbase的时指定zk就行,我试试加上端口

wangweislk 发表于 2015-11-18 15:24:08

bioger_hit 发表于 2015-11-18 13:40
楼主

spark-submit\


我使用官方给的Python例子也是异常的问题。

if __name__ == "__main__":
    if len(sys.argv) != 7:
      print("""
      Usage: hbase_outputformat <host> <table> <row> <family> <qualifier> <value>

      Run with example jar:
      ./bin/spark-submit --driver-class-path /path/to/example/jar \
      /path/to/examples/hbase_outputformat.py <args>
      Assumes you have created <table> with column family <family> in HBase
      running on <host> already
      """, file=sys.stderr)
      exit(-1)

    host = sys.argv[1]
    table = sys.argv[2]
    sc = SparkContext(appName="HBaseOutputFormat")

    conf = {"hbase.zookeeper.quorum": host,
            "hbase.mapred.outputtable": table,
            "mapreduce.outputformat.class": "org.apache.hadoop.hbase.mapreduce.TableOutputFormat",
            "mapreduce.job.output.key.class": "org.apache.hadoop.hbase.io.ImmutableBytesWritable",
            "mapreduce.job.output.value.class": "org.apache.hadoop.io.Writable"}
    keyConv = "org.apache.spark.examples.pythonconverters.StringToImmutableBytesWritableConverter"
    valueConv = "org.apache.spark.examples.pythonconverters.StringListToPutConverter"

    sc.parallelize(3:]]).map(lambda x: (x[0], x)).saveAsNewAPIHadoopDataset(
      conf=conf,
      keyConverter=keyConv,
      valueConverter=valueConv)

    sc.stop()

bioger_hit 发表于 2015-11-18 18:42:40

wangweislk 发表于 2015-11-18 15:24
我使用官方给的Python例子也是异常的问题。

集群正常吗?是按照正常的方式提交集群

wangweislk 发表于 2015-11-18 20:04:09

bioger_hit 发表于 2015-11-18 18:42
集群正常吗?是按照正常的方式提交集群

集群OK的

wangweislk 发表于 2015-11-19 09:29:28

bioger_hit 发表于 2015-11-18 18:42
集群正常吗?是按照正常的方式提交集群

我已经使用官方给的例子用Java和Python都测试过了,都是找不到org.apache.htrace.Trace包,要怎么将这个加到classpath,这个类的包和org.cloudera.htrace是不是有冲突

wangweislk 发表于 2015-11-19 14:33:55

整了几天终于OK了,尼玛才是环境变量问题。
shell:
HBASE_HOME=/opt/cloudera/parcels/CDH/lib/hbase
#HIVE_HOME=/opt/cloudera/parcels/CDH/lib/hive
export SPARK_CLASSPATH="$HBASE_HOME/conf/:$HBASE_HOME/hbase-client.jar:$HBASE_HOME/hbase-protocol.jar:$HBASE_HOME/lib/htrace-core.jar:$HBASE_HOME/lib/htrace-core-3.1.0-incubating.jar"&&
页: [1]
查看完整版本: SparkStreaming写入Hbase遇到包问题,跪求各位大神帮忙