titanic 发表于 2015-10-16 14:42:11

spark streaming kafka运行出错,找不到 com/yammer/metrics/Metrics

kafka是CDH里面的,版本是0.8.2.0-1.kafka1.3.0.p0.29,CDH是5.4.2版本,hadoop是2.6.0,spark是1.5版本.ubuntu系统使用intellij14,已经与spark集群配置了hosts

以下是运行代码
---------------------------------------------------------------------------------
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.*;
import scala.Tuple2;


import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.regex.Pattern;

import org.apache.spark.SparkConf;

import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;

import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;

import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;


import java.util.HashSet;


import com.google.common.collect.Lists;
import kafka.serializer.StringDecoder;



public class Test1 {
    private static final Pattern SPACE = Pattern.compile(" ");

    public static void main(String[] args) {
      init();
    }

    private static void init()
    {
      // TODO Auto-generated method stub



      String brokers = "192.168.3.201:9092,192.168.3.202:9092,192.168.3.203:9092";
      String topics = "titanic";
//      String group = "HJW.T2";
//      String zookeeper = "127.0.0.1:2181";

      // Create context with 2 second batch interval
      SparkConf sparkConf = new SparkConf().setAppName("JavaDirectKafkaWordCount_test1");
      sparkConf.setJars(new String[]{"/home/titanic/soft/intelijWorkspace/spark/com-hadoop-spark/target/com-hadoop-spark-1.0-SNAPSHOT-jar-with-dependencies.jar"});
      sparkConf.setMaster("spark://t1m1.tcloud.com:7077");
      //String[] jars = {"E:/download/eclipse-jee-juno-SR2-win32/workspace/test_spark_kafka/build/test_spark_kafka.jar"};
      //sparkConf.setJars(jars);
      JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(1000));


      HashSet<String> topicsSet = new HashSet<String>(Arrays.asList(topics.split(",")));
      HashMap<String, String> kafkaParams = new HashMap<String, String>();
      kafkaParams.put("metadata.broker.list", brokers);

      // Create direct kafka stream with brokers and topics
      JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(
                jssc,
                String.class,
                String.class,
                StringDecoder.class,
                StringDecoder.class,
                kafkaParams,
                topicsSet
      );


      String numThread = "2";
      int numThreads = Integer.parseInt(numThread);
      Map<String, Integer> topicMap = new HashMap<String, Integer>();
      String[] topics1 = topics.split(",");
      for (String topic : topics1) {
            System.out.println("【"+topic+"】");
            topicMap.put(topic, numThreads);
      }

      //JavaPairReceiverInputDStream<String, String> messages = KafkaUtils.createStream(jssc, zookeeper, group, topicMap);


      // Get the lines, split them into words, count the words and print
      JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() {

            public String call(Tuple2<String, String> tuple2) {
                System.out.println("【"+tuple2._2()+"】");
                return tuple2._2();
            }
      });

      int i=0;
      i=1;

      JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {

            public Iterable<String> call(String x) {
                System.out.println("【"+x+"】");
                return Lists.newArrayList(SPACE.split(x));
            }
      });

      JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
                new PairFunction<String, String, Integer>() {

                  public Tuple2<String, Integer> call(String s) {
                        System.out.println("【"+s+"】");
                        return new Tuple2<String, Integer>(s, 1);
                  }
                }).reduceByKey(new Function2<Integer, Integer, Integer>() {

            public Integer call(Integer i1, Integer i2) {
                return i1 + i2;
            }
      });

      wordCounts.print();

      // Start the computation
      jssc.start();
      jssc.awaitTermination();

    }

}-----------------------------------------------------maven依赖坐标如下
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.5.0</version>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
<version>1.5.0</version>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka_2.10</artifactId>
<version>1.5.0</version>
</dependency>

<dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming-zeromq_2.10</artifactId>
      <version>1.5.0</version>
</dependency>


-------------------------------链接spark集群正常.当用使用kafka终端命令生成数据时spark运行报错kafka生成数据命令如下kafka-console-producer--broker-list t1s1:9092,t1s2:9092,t1s3:9092 --topic titanic------------------------------------------------------------------------------------------------------spark运行报错,错误如下15/10/16 14:19:38 INFO BlockManager: Removing RDD 21715/10/16 14:19:38 INFO MapPartitionsRDD: Removing RDD 216 from persistence list15/10/16 14:19:38 INFO BlockManager: Removing RDD 21615/10/16 14:19:38 INFO KafkaRDD: Removing RDD 215 from persistence list15/10/16 14:19:38 INFO BlockManager: Removing RDD 21515/10/16 14:19:38 INFO ReceivedBlockTracker: Deleting batches ArrayBuffer()15/10/16 14:19:38 INFO InputInfoTracker: remove old batch metadata: 1444976376000 ms15/10/16 14:19:39 INFO JobScheduler: Added jobs for time 1444976379000 ms15/10/16 14:19:39 INFO JobScheduler: Starting job streaming job 1444976379000 ms.0 from job set of time 1444976379000 ms15/10/16 14:19:39 INFO SparkContext: Starting job: print at Test1.java:14015/10/16 14:19:39 INFO DAGScheduler: Registering RDD 228 (mapToPair at Test1.java:126)15/10/16 14:19:39 INFO DAGScheduler: Got job 90 (print at Test1.java:140) with 1 output partitions15/10/16 14:19:39 INFO DAGScheduler: Final stage: ResultStage 181(print at Test1.java:140)15/10/16 14:19:39 INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 180)15/10/16 14:19:39 INFO DAGScheduler: Missing parents: List(ShuffleMapStage 180)15/10/16 14:19:39 INFO DAGScheduler: Submitting ShuffleMapStage 180 (MapPartitionsRDD at mapToPair at Test1.java:126), which has no missing parents15/10/16 14:19:39 INFO MemoryStore: ensureFreeSpace(4656) called with curMem=260042, maxMem=99458482115/10/16 14:19:39 INFO MemoryStore: Block broadcast_135 stored as values in memory (estimated size 4.5 KB, free 948.3 MB)15/10/16 14:19:39 INFO MemoryStore: ensureFreeSpace(2500) called with curMem=264698, maxMem=99458482115/10/16 14:19:39 INFO MemoryStore: Block broadcast_135_piece0 stored as bytes in memory (estimated size 2.4 KB, free 948.3 MB)15/10/16 14:19:39 INFO BlockManagerInfo: Added broadcast_135_piece0 in memory on 192.168.1.127:60437 (size: 2.4 KB, free: 948.4 MB)15/10/16 14:19:39 INFO SparkContext: Created broadcast 135 from broadcast at DAGScheduler.scala:86115/10/16 14:19:39 INFO DAGScheduler: Submitting 1 missing tasks from ShuffleMapStage 180 (MapPartitionsRDD at mapToPair at Test1.java:126)15/10/16 14:19:39 INFO TaskSchedulerImpl: Adding task set 180.0 with 1 tasks15/10/16 14:19:39 INFO TaskSetManager: Starting task 0.0 in stage 180.0 (TID 135, 192.168.3.202, ANY, 2109 bytes)15/10/16 14:19:39 INFO BlockManagerInfo: Added broadcast_135_piece0 in memory on 192.168.3.202:45837 (size: 2.4 KB, free: 530.2 MB)15/10/16 14:19:39 WARN TaskSetManager: Lost task 0.0 in stage 180.0 (TID 135, 192.168.3.202): java.lang.NoClassDefFoundError: com/yammer/metrics/Metrics        at kafka.metrics.KafkaMetricsGroup$class.newTimer(KafkaMetricsGroup.scala:85)        at kafka.consumer.FetchRequestAndResponseMetrics.newTimer(FetchRequestAndResponseStats.scala:26)        at kafka.consumer.FetchRequestAndResponseMetrics.<init>(FetchRequestAndResponseStats.scala:35)        at kafka.consumer.FetchRequestAndResponseStats.<init>(FetchRequestAndResponseStats.scala:46)        at kafka.consumer.FetchRequestAndResponseStatsRegistry$$anonfun$2.apply(FetchRequestAndResponseStats.scala:59)        at kafka.consumer.FetchRequestAndResponseStatsRegistry$$anonfun$2.apply(FetchRequestAndResponseStats.scala:59)        at kafka.utils.Pool.getAndMaybePut(Pool.scala:61)        at kafka.consumer.FetchRequestAndResponseStatsRegistry$.getFetchRequestAndResponseStats(FetchRequestAndResponseStats.scala:63)        at kafka.consumer.SimpleConsumer.<init>(SimpleConsumer.scala:39)        at org.apache.spark.streaming.kafka.KafkaCluster.connect(KafkaCluster.scala:52)        at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.connectLeader(KafkaRDD.scala:170)        at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.<init>(KafkaRDD.scala:155)        at org.apache.spark.streaming.kafka.KafkaRDD.compute(KafkaRDD.scala:135)        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)        at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)        at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)        at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)        at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)        at org.apache.spark.scheduler.Task.run(Task.scala:88)        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)        at java.lang.Thread.run(Thread.java:745)Caused by: java.lang.ClassNotFoundException: com.yammer.metrics.Metrics        at java.net.URLClassLoader$1.run(URLClassLoader.java:366)        at java.net.URLClassLoader$1.run(URLClassLoader.java:355)        at java.security.AccessController.doPrivileged(Native Method)        at java.net.URLClassLoader.findClass(URLClassLoader.java:354)        at java.lang.ClassLoader.loadClass(ClassLoader.java:425)        at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)        at java.lang.ClassLoader.loadClass(ClassLoader.java:358)        ... 31 more15/10/16 14:19:39 INFO TaskSetManager: Starting task 0.1 in stage 180.0 (TID 136, 192.168.3.201, ANY, 2109 bytes)15/10/16 14:19:39 ERROR TaskSchedulerImpl: Lost executor 1 on 192.168.3.202: remote Rpc client disassociated15/10/16 14:19:39 INFO TaskSetManager: Re-queueing tasks for 1 from TaskSet 180.015/10/16 14:19:39 WARN ReliableDeliverySupervisor: Association with remote system has failed, address is now gated for ms. Reason: 15/10/16 14:19:39 INFO AppClient$ClientEndpoint: Executor updated: app-20151016141558-0028/1 is now EXITED (Command exited with code 50)15/10/16 14:19:39 INFO SparkDeploySchedulerBackend: Executor app-20151016141558-0028/1 removed: Command exited with code 5015/10/16 14:19:39 INFO DAGScheduler: Executor lost: 1 (epoch 45)15/10/16 14:19:39 INFO SparkDeploySchedulerBackend: Asked to remove non-existent executor 115/10/16 14:19:39 INFO AppClient$ClientEndpoint: Executor added: app-20151016141558-0028/3 on worker-20151013162724-192.168.3.202-14636 (192.168.3.202:14636) with 24 cores15/10/16 14:19:39 INFO SparkDeploySchedulerBackend: Granted executor ID app-20151016141558-0028/3 on hostPort 192.168.3.202:14636 with 24 cores, 1024.0 MB RAM15/10/16 14:19:39 INFO BlockManagerMasterEndpoint: Trying to remove executor 1 from BlockManagerMaster.15/10/16 14:19:39 INFO BlockManagerMasterEndpoint: Removing block manager BlockManagerId(1, 192.168.3.202, 45837)15/10/16 14:19:39 INFO AppClient$ClientEndpoint: Executor updated: app-20151016141558-0028/3 is now LOADING15/10/16 14:19:39 INFO BlockManagerMaster: Removed 1 successfully in removeExecutor15/10/16 14:19:39 INFO AppClient$ClientEndpoint: Executor updated: app-20151016141558-0028/3 is now RUNNING15/10/16 14:19:40 INFO JobScheduler: Added jobs for time 1444976380000 ms15/10/16 14:19:41 INFO JobScheduler: Added jobs for time 1444976381000 ms15/10/16 14:19:42 INFO ContextCleaner: Cleaned accumulator 10415/10/16 14:19:42 INFO BlockManagerInfo: Removed broadcast_102_piece0 on 192.168.1.127:60437 in memory (size: 2.4 KB, free: 948.4 MB)15/10/16 14:19:42 INFO BlockManagerInfo: Removed broadcast_102_piece0 on 192.168.3.203:24816 in memory (size: 2.4 KB, free: 530.3 MB)15/10/16 14:19:42 INFO JobScheduler: Added jobs for time 1444976382000 ms15/10/16 14:19:42 INFO ContextCleaner: Cleaned accumulator 10315/10/16 14:19:42 INFO ContextCleaner: Cleaned shuffle 3415/10/16 14:19:42 INFO BlockManagerInfo: Removed broadcast_101_piece0 on 192.168.1.127:60437 in memory (size: 1498.0 B, free: 948.4 MB)15/10/16 14:19:42 INFO BlockManagerInfo: Removed broadcast_101_piece0 on 192.168.3.201:13831 in memory (size: 1498.0 B, free: 530.3 MB)15/10/16 14:19:42 INFO ContextCleaner: Cleaned accumulator 102-------------------------------------------------------------------------------问题:java.lang.NoClassDefFoundError: com/yammer/metrics/Metrics看报错信息是找不到这个类.但是我maven里面好像有这个依赖.







langke93 发表于 2015-10-16 15:15:14

看看本地库中是否有

titanic 发表于 2015-10-16 15:57:11

langke93 发表于 2015-10-16 15:15
看看本地库中是否有

本地库中.m2/repository/com/yammer/metrics目录中没有这个jar包,但是有metrics-core和metrics-parent这两个文件夹在pom.xml文件中也添加了


<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming-zeromq_2.10</artifactId>
    <version>1.5.0</version>
</dependency>

依赖还是不行

langke93 发表于 2015-10-16 20:41:00

titanic 发表于 2015-10-16 15:57
本地库中.m2/repository/com/yammer/metrics目录中没有这个jar包,但是有metrics-core和metrics-parent这 ...

自己下载放到本地库中试试,本地库中找不到肯定报错的

轩辕依梦Q 发表于 2015-10-20 10:00:07


楼主是不是这个依赖没有加呢

titanic 发表于 2015-10-21 10:35:01

轩辕依梦Q 发表于 2015-10-20 10:00
楼主是不是这个依赖没有加呢

这个依赖坐标已经加了
页: [1]
查看完整版本: spark streaming kafka运行出错,找不到 com/yammer/metrics/Metrics