分享

spark streaming kafka运行出错,找不到 com/yammer/metrics/Metrics

titanic 发表于 2015-10-16 14:42:11 [显示全部楼层] 只看大图 回帖奖励 阅读模式 关闭右栏 5 64363
kafka是CDH里面的,版本是0.8.2.0-1.kafka1.3.0.p0.29,CDH是5.4.2版本,hadoop是2.6.0,spark是1.5版本.ubuntu系统使用intellij14,已经与spark集群配置了hosts

以下是运行代码
---------------------------------------------------------------------------------
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.*;
import scala.Tuple2;


import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.regex.Pattern;

import org.apache.spark.SparkConf;

import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;

import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;

import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;


import java.util.HashSet;


import com.google.common.collect.Lists;
import kafka.serializer.StringDecoder;



public class Test1 {
    private static final Pattern SPACE = Pattern.compile(" ");

    public static void main(String[] args) {
        init();
    }

    private static void init()
    {
        // TODO Auto-generated method stub



        String brokers = "192.168.3.201:9092,192.168.3.202:9092,192.168.3.203:9092";
        String topics = "titanic";
//        String group = "HJW.T2";
//        String zookeeper = "127.0.0.1:2181";

        // Create context with 2 second batch interval
        SparkConf sparkConf = new SparkConf().setAppName("JavaDirectKafkaWordCount_test1");
        sparkConf.setJars(new String[]{"/home/titanic/soft/intelijWorkspace/spark/com-hadoop-spark/target/com-hadoop-spark-1.0-SNAPSHOT-jar-with-dependencies.jar"});
        sparkConf.setMaster("spark://t1m1.tcloud.com:7077");
        //String[] jars = {"E:/download/eclipse-jee-juno-SR2-win32/workspace/test_spark_kafka/build/test_spark_kafka.jar"};
        //sparkConf.setJars(jars);
        JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(1000));


        HashSet<String> topicsSet = new HashSet<String>(Arrays.asList(topics.split(",")));
        HashMap<String, String> kafkaParams = new HashMap<String, String>();
        kafkaParams.put("metadata.broker.list", brokers);

        // Create direct kafka stream with brokers and topics
        JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(
                jssc,
                String.class,
                String.class,
                StringDecoder.class,
                StringDecoder.class,
                kafkaParams,
                topicsSet
        );


        String numThread = "2";
        int numThreads = Integer.parseInt(numThread);
        Map<String, Integer> topicMap = new HashMap<String, Integer>();
        String[] topics1 = topics.split(",");
        for (String topic : topics1) {
            System.out.println(""+topic+"");
            topicMap.put(topic, numThreads);
        }

        //JavaPairReceiverInputDStream<String, String> messages = KafkaUtils.createStream(jssc, zookeeper, group, topicMap);


        // Get the lines, split them into words, count the words and print
        JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() {

            public String call(Tuple2<String, String> tuple2) {
                System.out.println(""+tuple2._2()+"");
                return tuple2._2();
            }
        });

        int i=0;
        i=1;

        JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {

            public Iterable<String> call(String x) {
                System.out.println(""+x+"");
                return Lists.newArrayList(SPACE.split(x));
            }
        });

        JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
                new PairFunction<String, String, Integer>() {

                    public Tuple2<String, Integer> call(String s) {
                        System.out.println(""+s+"");
                        return new Tuple2<String, Integer>(s, 1);
                    }
                }).reduceByKey(new Function2<Integer, Integer, Integer>() {

            public Integer call(Integer i1, Integer i2) {
                return i1 + i2;
            }
        });

        wordCounts.print();

        // Start the computation
        jssc.start();
        jssc.awaitTermination();

    }

}-----------------------------------------------------maven依赖坐标如下
<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-core_2.10</artifactId>
  <version>1.5.0</version>
</dependency>

<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-streaming_2.10</artifactId>
  <version>1.5.0</version>
</dependency>

<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-streaming-kafka_2.10</artifactId>
  <version>1.5.0</version>
</dependency>

  <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming-zeromq_2.10</artifactId>
      <version>1.5.0</version>
  </dependency>


-------------------------------链接spark集群正常.当用使用kafka终端命令生成数据时spark运行报错kafka生成数据命令如下kafka-console-producer  --broker-list t1s1:9092,t1s2:9092,t1s3:9092 --topic titanic------------------------------------------------------------------------------------------------------spark运行报错,错误如下15/10/16 14:19:38 INFO BlockManager: Removing RDD 21715/10/16 14:19:38 INFO MapPartitionsRDD: Removing RDD 216 from persistence list15/10/16 14:19:38 INFO BlockManager: Removing RDD 21615/10/16 14:19:38 INFO KafkaRDD: Removing RDD 215 from persistence list15/10/16 14:19:38 INFO BlockManager: Removing RDD 21515/10/16 14:19:38 INFO ReceivedBlockTracker: Deleting batches ArrayBuffer()15/10/16 14:19:38 INFO InputInfoTracker: remove old batch metadata: 1444976376000 ms15/10/16 14:19:39 INFO JobScheduler: Added jobs for time 1444976379000 ms15/10/16 14:19:39 INFO JobScheduler: Starting job streaming job 1444976379000 ms.0 from job set of time 1444976379000 ms15/10/16 14:19:39 INFO SparkContext: Starting job: print at Test1.java:14015/10/16 14:19:39 INFO DAGScheduler: Registering RDD 228 (mapToPair at Test1.java:126)15/10/16 14:19:39 INFO DAGScheduler: Got job 90 (print at Test1.java:140) with 1 output partitions15/10/16 14:19:39 INFO DAGScheduler: Final stage: ResultStage 181(print at Test1.java:140)15/10/16 14:19:39 INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 180)15/10/16 14:19:39 INFO DAGScheduler: Missing parents: List(ShuffleMapStage 180)15/10/16 14:19:39 INFO DAGScheduler: Submitting ShuffleMapStage 180 (MapPartitionsRDD[228] at mapToPair at Test1.java:126), which has no missing parents15/10/16 14:19:39 INFO MemoryStore: ensureFreeSpace(4656) called with curMem=260042, maxMem=99458482115/10/16 14:19:39 INFO MemoryStore: Block broadcast_135 stored as values in memory (estimated size 4.5 KB, free 948.3 MB)15/10/16 14:19:39 INFO MemoryStore: ensureFreeSpace(2500) called with curMem=264698, maxMem=99458482115/10/16 14:19:39 INFO MemoryStore: Block broadcast_135_piece0 stored as bytes in memory (estimated size 2.4 KB, free 948.3 MB)15/10/16 14:19:39 INFO BlockManagerInfo: Added broadcast_135_piece0 in memory on 192.168.1.127:60437 (size: 2.4 KB, free: 948.4 MB)15/10/16 14:19:39 INFO SparkContext: Created broadcast 135 from broadcast at DAGScheduler.scala:86115/10/16 14:19:39 INFO DAGScheduler: Submitting 1 missing tasks from ShuffleMapStage 180 (MapPartitionsRDD[228] at mapToPair at Test1.java:126)15/10/16 14:19:39 INFO TaskSchedulerImpl: Adding task set 180.0 with 1 tasks15/10/16 14:19:39 INFO TaskSetManager: Starting task 0.0 in stage 180.0 (TID 135, 192.168.3.202, ANY, 2109 bytes)15/10/16 14:19:39 INFO BlockManagerInfo: Added broadcast_135_piece0 in memory on 192.168.3.202:45837 (size: 2.4 KB, free: 530.2 MB)15/10/16 14:19:39 WARN TaskSetManager: Lost task 0.0 in stage 180.0 (TID 135, 192.168.3.202): java.lang.NoClassDefFoundError: com/yammer/metrics/Metrics        at kafka.metrics.KafkaMetricsGroup$class.newTimer(KafkaMetricsGroup.scala:85)        at kafka.consumer.FetchRequestAndResponseMetrics.newTimer(FetchRequestAndResponseStats.scala:26)        at kafka.consumer.FetchRequestAndResponseMetrics.<init>(FetchRequestAndResponseStats.scala:35)        at kafka.consumer.FetchRequestAndResponseStats.<init>(FetchRequestAndResponseStats.scala:46)        at kafka.consumer.FetchRequestAndResponseStatsRegistry$$anonfun$2.apply(FetchRequestAndResponseStats.scala:59)        at kafka.consumer.FetchRequestAndResponseStatsRegistry$$anonfun$2.apply(FetchRequestAndResponseStats.scala:59)        at kafka.utils.Pool.getAndMaybePut(Pool.scala:61)        at kafka.consumer.FetchRequestAndResponseStatsRegistry$.getFetchRequestAndResponseStats(FetchRequestAndResponseStats.scala:63)        at kafka.consumer.SimpleConsumer.<init>(SimpleConsumer.scala:39)        at org.apache.spark.streaming.kafka.KafkaCluster.connect(KafkaCluster.scala:52)        at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.connectLeader(KafkaRDD.scala:170)        at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.<init>(KafkaRDD.scala:155)        at org.apache.spark.streaming.kafka.KafkaRDD.compute(KafkaRDD.scala:135)        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)        at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)        at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)        at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)        at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)        at org.apache.spark.scheduler.Task.run(Task.scala:88)        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)        at java.lang.Thread.run(Thread.java:745)Caused by: java.lang.ClassNotFoundException: com.yammer.metrics.Metrics        at java.net.URLClassLoader$1.run(URLClassLoader.java:366)        at java.net.URLClassLoader$1.run(URLClassLoader.java:355)        at java.security.AccessController.doPrivileged(Native Method)        at java.net.URLClassLoader.findClass(URLClassLoader.java:354)        at java.lang.ClassLoader.loadClass(ClassLoader.java:425)        at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)        at java.lang.ClassLoader.loadClass(ClassLoader.java:358)        ... 31 more15/10/16 14:19:39 INFO TaskSetManager: Starting task 0.1 in stage 180.0 (TID 136, 192.168.3.201, ANY, 2109 bytes)15/10/16 14:19:39 ERROR TaskSchedulerImpl: Lost executor 1 on 192.168.3.202: remote Rpc client disassociated15/10/16 14:19:39 INFO TaskSetManager: Re-queueing tasks for 1 from TaskSet 180.015/10/16 14:19:39 WARN ReliableDeliverySupervisor: Association with remote system [akka.tcp://sparkExecutor@192.168.3.202:59412] has failed, address is now gated for [5000] ms. Reason: [Disassociated] 15/10/16 14:19:39 INFO AppClient$ClientEndpoint: Executor updated: app-20151016141558-0028/1 is now EXITED (Command exited with code 50)15/10/16 14:19:39 INFO SparkDeploySchedulerBackend: Executor app-20151016141558-0028/1 removed: Command exited with code 5015/10/16 14:19:39 INFO DAGScheduler: Executor lost: 1 (epoch 45)15/10/16 14:19:39 INFO SparkDeploySchedulerBackend: Asked to remove non-existent executor 115/10/16 14:19:39 INFO AppClient$ClientEndpoint: Executor added: app-20151016141558-0028/3 on worker-20151013162724-192.168.3.202-14636 (192.168.3.202:14636) with 24 cores15/10/16 14:19:39 INFO SparkDeploySchedulerBackend: Granted executor ID app-20151016141558-0028/3 on hostPort 192.168.3.202:14636 with 24 cores, 1024.0 MB RAM15/10/16 14:19:39 INFO BlockManagerMasterEndpoint: Trying to remove executor 1 from BlockManagerMaster.15/10/16 14:19:39 INFO BlockManagerMasterEndpoint: Removing block manager BlockManagerId(1, 192.168.3.202, 45837)15/10/16 14:19:39 INFO AppClient$ClientEndpoint: Executor updated: app-20151016141558-0028/3 is now LOADING15/10/16 14:19:39 INFO BlockManagerMaster: Removed 1 successfully in removeExecutor15/10/16 14:19:39 INFO AppClient$ClientEndpoint: Executor updated: app-20151016141558-0028/3 is now RUNNING15/10/16 14:19:40 INFO JobScheduler: Added jobs for time 1444976380000 ms15/10/16 14:19:41 INFO JobScheduler: Added jobs for time 1444976381000 ms15/10/16 14:19:42 INFO ContextCleaner: Cleaned accumulator 10415/10/16 14:19:42 INFO BlockManagerInfo: Removed broadcast_102_piece0 on 192.168.1.127:60437 in memory (size: 2.4 KB, free: 948.4 MB)15/10/16 14:19:42 INFO BlockManagerInfo: Removed broadcast_102_piece0 on 192.168.3.203:24816 in memory (size: 2.4 KB, free: 530.3 MB)15/10/16 14:19:42 INFO JobScheduler: Added jobs for time 1444976382000 ms15/10/16 14:19:42 INFO ContextCleaner: Cleaned accumulator 10315/10/16 14:19:42 INFO ContextCleaner: Cleaned shuffle 3415/10/16 14:19:42 INFO BlockManagerInfo: Removed broadcast_101_piece0 on 192.168.1.127:60437 in memory (size: 1498.0 B, free: 948.4 MB)15/10/16 14:19:42 INFO BlockManagerInfo: Removed broadcast_101_piece0 on 192.168.3.201:13831 in memory (size: 1498.0 B, free: 530.3 MB)15/10/16 14:19:42 INFO ContextCleaner: Cleaned accumulator 102-------------------------------------------------------------------------------问题:java.lang.NoClassDefFoundError: com/yammer/metrics/Metrics看报错信息是找不到这个类.但是我maven里面好像有这个依赖.







已有(5)人评论

跳转到指定楼层
langke93 发表于 2015-10-16 15:15:14
看看本地库中是否有
回复

使用道具 举报

titanic 发表于 2015-10-16 15:57:11
langke93 发表于 2015-10-16 15:15
看看本地库中是否有

本地库中.m2/repository/com/yammer/metrics目录中没有这个jar包,但是有metrics-core和metrics-parent这两个文件夹在pom.xml文件中也添加了


<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming-zeromq_2.10</artifactId>
    <version>1.5.0</version>
</dependency>

依赖还是不行
回复

使用道具 举报

langke93 发表于 2015-10-16 20:41:00
titanic 发表于 2015-10-16 15:57
本地库中.m2/repository/com/yammer/metrics目录中没有这个jar包,但是有metrics-core和metrics-parent这 ...

自己下载放到本地库中试试,本地库中找不到肯定报错的
回复

使用道具 举报

轩辕依梦Q 发表于 2015-10-20 10:00:07

楼主是不是这个依赖没有加呢
metrty.png
回复

使用道具 举报

titanic 发表于 2015-10-21 10:35:01
轩辕依梦Q 发表于 2015-10-20 10:00
楼主是不是这个依赖没有加呢

这个依赖坐标已经加了
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条