kafka是CDH里面的,版本是0.8.2.0-1.kafka1.3.0.p0.29,CDH是5.4.2版本,hadoop是2.6.0,spark是1.5版本.ubuntu系统使用intellij14,已经与spark集群配置了hosts
以下是运行代码
---------------------------------------------------------------------------------
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.*;
import scala.Tuple2;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.regex.Pattern;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;
import java.util.HashSet;
import com.google.common.collect.Lists;
import kafka.serializer.StringDecoder;
public class Test1 {
private static final Pattern SPACE = Pattern.compile(" ");
public static void main(String[] args) {
init();
}
private static void init()
{
// TODO Auto-generated method stub
String brokers = "192.168.3.201:9092,192.168.3.202:9092,192.168.3.203:9092";
String topics = "titanic";
// String group = "HJW.T2";
// String zookeeper = "127.0.0.1:2181";
// Create context with 2 second batch interval
SparkConf sparkConf = new SparkConf().setAppName("JavaDirectKafkaWordCount_test1");
sparkConf.setJars(new String[]{"/home/titanic/soft/intelijWorkspace/spark/com-hadoop-spark/target/com-hadoop-spark-1.0-SNAPSHOT-jar-with-dependencies.jar"});
sparkConf.setMaster("spark://t1m1.tcloud.com:7077");
//String[] jars = {"E:/download/eclipse-jee-juno-SR2-win32/workspace/test_spark_kafka/build/test_spark_kafka.jar"};
//sparkConf.setJars(jars);
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(1000));
HashSet<String> topicsSet = new HashSet<String>(Arrays.asList(topics.split(",")));
HashMap<String, String> kafkaParams = new HashMap<String, String>();
kafkaParams.put("metadata.broker.list", brokers);
// Create direct kafka stream with brokers and topics
JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(
jssc,
String.class,
String.class,
StringDecoder.class,
StringDecoder.class,
kafkaParams,
topicsSet
);
String numThread = "2";
int numThreads = Integer.parseInt(numThread);
Map<String, Integer> topicMap = new HashMap<String, Integer>();
String[] topics1 = topics.split(",");
for (String topic : topics1) {
System.out.println("【"+topic+"】");
topicMap.put(topic, numThreads);
}
//JavaPairReceiverInputDStream<String, String> messages = KafkaUtils.createStream(jssc, zookeeper, group, topicMap);
// Get the lines, split them into words, count the words and print
JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() {
public String call(Tuple2<String, String> tuple2) {
System.out.println("【"+tuple2._2()+"】");
return tuple2._2();
}
});
int i=0;
i=1;
JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
public Iterable<String> call(String x) {
System.out.println("【"+x+"】");
return Lists.newArrayList(SPACE.split(x));
}
});
JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
new PairFunction<String, String, Integer>() {
public Tuple2<String, Integer> call(String s) {
System.out.println("【"+s+"】");
return new Tuple2<String, Integer>(s, 1);
}
}).reduceByKey(new Function2<Integer, Integer, Integer>() {
public Integer call(Integer i1, Integer i2) {
return i1 + i2;
}
});
wordCounts.print();
// Start the computation
jssc.start();
jssc.awaitTermination();
}
}-----------------------------------------------------maven依赖坐标如下
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.5.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
<version>1.5.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka_2.10</artifactId>
<version>1.5.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-zeromq_2.10</artifactId>
<version>1.5.0</version>
</dependency>
-------------------------------链接spark集群正常.当用使用kafka终端命令生成数据时spark运行报错kafka生成数据命令如下kafka-console-producer --broker-list t1s1:9092,t1s2:9092,t1s3:9092 --topic titanic------------------------------------------------------------------------------------------------------spark运行报错,错误如下15/10/16 14:19:38 INFO BlockManager: Removing RDD 21715/10/16 14:19:38 INFO MapPartitionsRDD: Removing RDD 216 from persistence list15/10/16 14:19:38 INFO BlockManager: Removing RDD 21615/10/16 14:19:38 INFO KafkaRDD: Removing RDD 215 from persistence list15/10/16 14:19:38 INFO BlockManager: Removing RDD 21515/10/16 14:19:38 INFO ReceivedBlockTracker: Deleting batches ArrayBuffer()15/10/16 14:19:38 INFO InputInfoTracker: remove old batch metadata: 1444976376000 ms15/10/16 14:19:39 INFO JobScheduler: Added jobs for time 1444976379000 ms15/10/16 14:19:39 INFO JobScheduler: Starting job streaming job 1444976379000 ms.0 from job set of time 1444976379000 ms15/10/16 14:19:39 INFO SparkContext: Starting job: print at Test1.java:14015/10/16 14:19:39 INFO DAGScheduler: Registering RDD 228 (mapToPair at Test1.java:126)15/10/16 14:19:39 INFO DAGScheduler: Got job 90 (print at Test1.java:140) with 1 output partitions15/10/16 14:19:39 INFO DAGScheduler: Final stage: ResultStage 181(print at Test1.java:140)15/10/16 14:19:39 INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 180)15/10/16 14:19:39 INFO DAGScheduler: Missing parents: List(ShuffleMapStage 180)15/10/16 14:19:39 INFO DAGScheduler: Submitting ShuffleMapStage 180 (MapPartitionsRDD[228] at mapToPair at Test1.java:126), which has no missing parents15/10/16 14:19:39 INFO MemoryStore: ensureFreeSpace(4656) called with curMem=260042, maxMem=99458482115/10/16 14:19:39 INFO MemoryStore: Block broadcast_135 stored as values in memory (estimated size 4.5 KB, free 948.3 MB)15/10/16 14:19:39 INFO MemoryStore: ensureFreeSpace(2500) called with curMem=264698, maxMem=99458482115/10/16 14:19:39 INFO MemoryStore: Block broadcast_135_piece0 stored as bytes in memory (estimated size 2.4 KB, free 948.3 MB)15/10/16 14:19:39 INFO BlockManagerInfo: Added broadcast_135_piece0 in memory on 192.168.1.127:60437 (size: 2.4 KB, free: 948.4 MB)15/10/16 14:19:39 INFO SparkContext: Created broadcast 135 from broadcast at DAGScheduler.scala:86115/10/16 14:19:39 INFO DAGScheduler: Submitting 1 missing tasks from ShuffleMapStage 180 (MapPartitionsRDD[228] at mapToPair at Test1.java:126)15/10/16 14:19:39 INFO TaskSchedulerImpl: Adding task set 180.0 with 1 tasks15/10/16 14:19:39 INFO TaskSetManager: Starting task 0.0 in stage 180.0 (TID 135, 192.168.3.202, ANY, 2109 bytes)15/10/16 14:19:39 INFO BlockManagerInfo: Added broadcast_135_piece0 in memory on 192.168.3.202:45837 (size: 2.4 KB, free: 530.2 MB)15/10/16 14:19:39 WARN TaskSetManager: Lost task 0.0 in stage 180.0 (TID 135, 192.168.3.202): java.lang.NoClassDefFoundError: com/yammer/metrics/Metrics at kafka.metrics.KafkaMetricsGroup$class.newTimer(KafkaMetricsGroup.scala:85) at kafka.consumer.FetchRequestAndResponseMetrics.newTimer(FetchRequestAndResponseStats.scala:26) at kafka.consumer.FetchRequestAndResponseMetrics.<init>(FetchRequestAndResponseStats.scala:35) at kafka.consumer.FetchRequestAndResponseStats.<init>(FetchRequestAndResponseStats.scala:46) at kafka.consumer.FetchRequestAndResponseStatsRegistry$$anonfun$2.apply(FetchRequestAndResponseStats.scala:59) at kafka.consumer.FetchRequestAndResponseStatsRegistry$$anonfun$2.apply(FetchRequestAndResponseStats.scala:59) at kafka.utils.Pool.getAndMaybePut(Pool.scala:61) at kafka.consumer.FetchRequestAndResponseStatsRegistry$.getFetchRequestAndResponseStats(FetchRequestAndResponseStats.scala:63) at kafka.consumer.SimpleConsumer.<init>(SimpleConsumer.scala:39) at org.apache.spark.streaming.kafka.KafkaCluster.connect(KafkaCluster.scala:52) at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.connectLeader(KafkaRDD.scala:170) at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.<init>(KafkaRDD.scala:155) at org.apache.spark.streaming.kafka.KafkaRDD.compute(KafkaRDD.scala:135) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:88) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745)Caused by: java.lang.ClassNotFoundException: com.yammer.metrics.Metrics at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) ... 31 more15/10/16 14:19:39 INFO TaskSetManager: Starting task 0.1 in stage 180.0 (TID 136, 192.168.3.201, ANY, 2109 bytes)15/10/16 14:19:39 ERROR TaskSchedulerImpl: Lost executor 1 on 192.168.3.202: remote Rpc client disassociated15/10/16 14:19:39 INFO TaskSetManager: Re-queueing tasks for 1 from TaskSet 180.015/10/16 14:19:39 WARN ReliableDeliverySupervisor: Association with remote system [akka.tcp://sparkExecutor@192.168.3.202:59412] has failed, address is now gated for [5000] ms. Reason: [Disassociated] 15/10/16 14:19:39 INFO AppClient$ClientEndpoint: Executor updated: app-20151016141558-0028/1 is now EXITED (Command exited with code 50)15/10/16 14:19:39 INFO SparkDeploySchedulerBackend: Executor app-20151016141558-0028/1 removed: Command exited with code 5015/10/16 14:19:39 INFO DAGScheduler: Executor lost: 1 (epoch 45)15/10/16 14:19:39 INFO SparkDeploySchedulerBackend: Asked to remove non-existent executor 115/10/16 14:19:39 INFO AppClient$ClientEndpoint: Executor added: app-20151016141558-0028/3 on worker-20151013162724-192.168.3.202-14636 (192.168.3.202:14636) with 24 cores15/10/16 14:19:39 INFO SparkDeploySchedulerBackend: Granted executor ID app-20151016141558-0028/3 on hostPort 192.168.3.202:14636 with 24 cores, 1024.0 MB RAM15/10/16 14:19:39 INFO BlockManagerMasterEndpoint: Trying to remove executor 1 from BlockManagerMaster.15/10/16 14:19:39 INFO BlockManagerMasterEndpoint: Removing block manager BlockManagerId(1, 192.168.3.202, 45837)15/10/16 14:19:39 INFO AppClient$ClientEndpoint: Executor updated: app-20151016141558-0028/3 is now LOADING15/10/16 14:19:39 INFO BlockManagerMaster: Removed 1 successfully in removeExecutor15/10/16 14:19:39 INFO AppClient$ClientEndpoint: Executor updated: app-20151016141558-0028/3 is now RUNNING15/10/16 14:19:40 INFO JobScheduler: Added jobs for time 1444976380000 ms15/10/16 14:19:41 INFO JobScheduler: Added jobs for time 1444976381000 ms15/10/16 14:19:42 INFO ContextCleaner: Cleaned accumulator 10415/10/16 14:19:42 INFO BlockManagerInfo: Removed broadcast_102_piece0 on 192.168.1.127:60437 in memory (size: 2.4 KB, free: 948.4 MB)15/10/16 14:19:42 INFO BlockManagerInfo: Removed broadcast_102_piece0 on 192.168.3.203:24816 in memory (size: 2.4 KB, free: 530.3 MB)15/10/16 14:19:42 INFO JobScheduler: Added jobs for time 1444976382000 ms15/10/16 14:19:42 INFO ContextCleaner: Cleaned accumulator 10315/10/16 14:19:42 INFO ContextCleaner: Cleaned shuffle 3415/10/16 14:19:42 INFO BlockManagerInfo: Removed broadcast_101_piece0 on 192.168.1.127:60437 in memory (size: 1498.0 B, free: 948.4 MB)15/10/16 14:19:42 INFO BlockManagerInfo: Removed broadcast_101_piece0 on 192.168.3.201:13831 in memory (size: 1498.0 B, free: 530.3 MB)15/10/16 14:19:42 INFO ContextCleaner: Cleaned accumulator 102-------------------------------------------------------------------------------问题:java.lang.NoClassDefFoundError: com/yammer/metrics/Metrics看报错信息是找不到这个类.但是我maven里面好像有这个依赖.
|
|