分享

求助,KafkaUtils.createStream运行时报错

SingleDee 发表于 2015-9-11 22:12:35 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 8 47624
Spark版本号是1.4.1
scala版本是2.10.5
kafka对应的版本是2.11-1.4.1
在运行到
JavaPairReceiverInputDStream<String, String> messages =
                    KafkaUtils.createStream(jssc, "localhost:2181", "233", topicMap, StorageLevels.MEMORY_AND_DISK_SER);

的时候抛出这样的异常

Exception in thread "main" java.lang.NoSuchMethodError: scala.Predef$.ArrowAssoc(Ljava/lang/Object;)Ljava/lang/Object;
        at org.apache.spark.streaming.kafka.KafkaUtils$.createStream(KafkaUtils.scala:63)
        at org.apache.spark.streaming.kafka.KafkaUtils$.createStream(KafkaUtils.scala:103)
        at org.apache.spark.streaming.kafka.KafkaUtils.createStream(KafkaUtils.scala)
        at CallAnalysis.main(CallAnalysis.java:38)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:665)
        at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:170)
        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:193)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:112)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)


请问是哪方面的原因?如果使用createDirectStream的话,各个参数是什么意思?
KafkaUtils.createDirectStream(
StreamContext arg0,
Class<k> arg1,
Class<V> arg2,
Class<KD> arg3,
Class<VD> arg4,
Map<String,String> arg5,
Set<String> arg6)

已有(8)人评论

跳转到指定楼层
Alkaloid0515 发表于 2015-9-11 22:48:09
第一个问题,检查下是不是忘加包了
第二个问题,对于陌生的函数,最好的就是查看源码,源码中都有他们的注释,这是最准确的解释
回复

使用道具 举报

SingleDee 发表于 2015-9-14 11:29:45
问题终于解决了
spark-streaming-kafka_2.10-1.4.1.jar
这个依赖包只是第一个依赖包,而且版本号有挺严格的限制
第一个2.10代表是scala的版本号,第二个是代表spark的版本号,必须对应好才能弹出第二个错误
第二个错误是需要再增加一个依赖包
spark-streaming-kafka-assembly_2.10-1.4.1.jar
这个包好像才是kafka各种方法在spark上支持的包
第一个只是一个工具包用来创建输入流



-------------------------------------继续出现问题,求解答-------------------------------

创建好输入流之后,能连接上kafka和zookeeper,但是在productor里面输入东西,consumer能成功获取,但在spark程序就不能获取consumer的内容。。请问这是怎么回事》?
回复

使用道具 举报

mituan2008 发表于 2015-9-14 11:46:56


没有太懂,给楼主提供篇文章,看看是否有帮助

Spark(1.2.0) Streaming 集成 Kafka 总结
http://www.aboutyun.com/thread-11536-1-1.html

回复

使用道具 举报

chyeers 发表于 2015-9-14 15:25:06
楼主求教下,请问你的程序是怎么 spark-submit的?我自己也写了一个kafka-streaming 的程序,结果是 standalone local模式可以运行,standalone cluster模式报Caused by: java.lang.NoClassDefFoundError: org/apache/spark/streaming/kafka/KafkaUtils$ 这个错误。[mw_shl_code=shell,true]spark-submit --master spark://h005:6066 \
             --class FromKafkaToPhoenix \
             --deploy-mode cluster \
             --supervise \
             --executor-memory 2G \
             --total-executor-cores 10 \
             --conf "spark.driver.extraClassPath=hbase-protocol-0.98.12.1-hadoop2.jar" \
             --conf "spark.executor.extraClassPath=hbase-protocol-0.98.12.1-hadoop2.jar" \
             --jars spark-streaming-kafka-assembly_2.10-1.4.1.jar,guava-12.0.1.jar,hbase-client-0.98.12.1-hadoop2.jar,hbase-common-0.98.12.1-hadoop2.jar,hbase-protocol
-0.98.12.1-hadoop2.jar,hbase-server-0.98.12.1-hadoop2.jar,htrace-core-2.04.jar,jdom-2.0.5.jar,jruby-complete-1.6.8.jar,mysql-connector-java-5.0.8.jar,phoenix-core-4.4.
0-HBase-0.98.jar,phoenix-spark-4.4.0-HBase-0.98.jar,spark-assembly-1.4.1-hadoop2.2.0.jar \
             testkafka.jar h001:9092 testkafka[/mw_shl_code]
回复

使用道具 举报

SingleDee 发表于 2015-9-14 15:29:45
chyeers 发表于 2015-9-14 15:25
楼主求教下,请问你的程序是怎么 spark-submit的?我自己也写了一个kafka-streaming 的程序,结果是 standa ...

java.lang.NoClassDefFoundError: 这类错误其实可以把所有需要用到的依赖包放到spark下的lib文件夹里面,然后在conf/spark-env.sh文件里面添加
export SPARK_CLASS=你的依赖包的绝对路径,用冒号分隔开



例如这样


export SPARK_CLASSPATH=/home/SingleDee/spark/lib/spark-streaming-kafka_2.10-1.4.1.jar:/home/SingleDee/spark/lib/spark-streaming-k
afka-assembly_2.10-1.4.1.jar:/home/SingleDee/spark/lib/jedis-2.7.2.jar

我这样做的话,我运行submit的时候就只需要
bin/spark-submit --class "JavaNetworkWordCount" --master local[4] simple.jar localhost 4999




回复

使用道具 举报

SingleDee 发表于 2015-9-14 15:34:41
再更新一下进度
Spark能连接kafka了。。。
好突然地,不知道为什么,就好了(没认真学习,乱摸的后果)
中途修改过server,zookeepker的配置文件,把localhost改成自己本机ip
然后把consumer的group改成纯英文(之前是纯数字233。。。。)
---------------------------------还是有下一个问题----------------------------------------
怎么在Streaming的程序中设置offset位移控制?java语言,scala我看到有asinsteadofXXX的方法,但java里面没找到。。。

回复

使用道具 举报

chyeers 发表于 2015-9-14 17:52:37
SingleDee 发表于 2015-9-14 15:29
java.lang.NoClassDefFoundError: 这类错误其实可以把所有需要用到的依赖包放到spark下的lib文件夹里面, ...

我配置了 spark_classpath,使用过程中有这个错误报出来,而且这样启动,有时有效不报这个错误
[mw_shl_code=shell,true]15/09/14 17:49:02 WARN SparkConf:
SPARK_CLASSPATH was detected (set to '/usr/local/cluster/spark-1.4.1/lib/guava-12.0.1.jar:/usr/local/cluster/spark-1.4.1/lib/hbase-client-0.98.12.1-hadoop2.jar:/usr/local/cluster/spark-1.4.1/lib/hbase-common-0.98.12.1-hadoop2.jar:/usr/local/cluster/spark-1.4.1/lib/hbase-protocol-0.98.12.1-hadoop2.jar:/usr/local/cluster/spark-1.4.1/lib/hbase-server-0.98.12.1-hadoop2.jar:/usr/local/cluster/spark-1.4.1/lib/htrace-core-2.04.jar:/usr/local/cluster/spark-1.4.1/lib/jdom-2.0.5.jar:/usr/local/cluster/spark-1.4.1/lib/jruby-complete-1.6.8.jar:/usr/local/cluster/spark-1.4.1/lib/mysql-connector-java-5.0.8.jar:/usr/local/cluster/spark-1.4.1/lib/phoenix-core-4.4.0-HBase-0.98.jar:/usr/local/cluster/spark-1.4.1/lib/phoenix-spark-4.4.0-HBase-0.98.jar:/usr/local/cluster/spark-1.4.1/lib/phoenix-core-4.4.0-HBase-0.98.jar:/usr/local/cluster/spark-1.4.1/lib/phoenix-spark-4.4.0-HBase-0.98.jar:/usr/local/cluster/spark-1.4.1/lib/spark-assembly-1.4.1-hadoop2.2.0.jar:/usr/local/cluster/spark-1.4.1/lib/spark-streaming-kafka-assembly_2.10-1.4.1.jar:').
This is deprecated in Spark 1.0+.

Please instead use:
- ./spark-submit with --driver-class-path to augment the driver classpath
- spark.executor.extraClassPath to augment the executor classpath[/mw_shl_code]
回复

使用道具 举报

chyeers 发表于 2015-9-14 18:05:15
本帖最后由 chyeers 于 2015-9-14 18:06 编辑
SingleDee 发表于 2015-9-14 15:34
再更新一下进度
Spark能连接kafka了。。。
好突然地,不知道为什么,就好了(没认真学习,乱摸的后果)

spark连接kafka是能够的,我使用的是 checkpoint来防止 offset 丢失
[mw_shl_code=scala,true]val ssc = StreamingContext.getOrCreate("/data/checkpoint/kafka",
      () => {
        createContext(brokers, topics)
      })[/mw_shl_code]
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条