程序代码
[mw_shl_code=applescript,true]package org.open.storm.topology;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.AlreadyAliveException;
import org.apache.storm.generated.InvalidTopologyException;
import org.apache.storm.kafka.*;
import org.apache.storm.spout.SchemeAsMultiScheme;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Created by root on 16-5-30.
*/
public class MyKafkaTopology {
public static class KafkaWordSplitter extends BaseRichBolt {
private static final long serialVersionUID = 886149197481637894L;
private OutputCollector collector;
@Override
public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
this.collector = collector;
}
@Override
public void execute(Tuple input) {
String line = input.getString(0);
String[] words = line.split("\\s+");
for(String word : words) {
collector.emit(input, new Values(word, 1));
}
collector.ack(input);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word", "count"));
}
}
public static class WordCounter extends BaseRichBolt {
private static final long serialVersionUID = 886149197481637894L;
private OutputCollector collector;
private Map<String, AtomicInteger> counterMap;
@Override
public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
this.collector = collector;
this.counterMap = new HashMap<String, AtomicInteger>();
}
@Override
public void execute(Tuple input) {
String word = input.getString(0);
int count = input.getInteger(1);
AtomicInteger ai = this.counterMap.get(word);
if(ai == null) {
ai = new AtomicInteger();
this.counterMap.put(word, ai);
}
ai.addAndGet(count);
collector.ack(input);
}
@Override
public void cleanup() {
Iterator<Map.Entry<String, AtomicInteger>> iter = this.counterMap.entrySet().iterator();
while(iter.hasNext()) {
Map.Entry<String, AtomicInteger> entry = iter.next();
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word", "count"));
}
}
public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException, InterruptedException {
String zks = "hadoop:2181";
String topic = "storm_topic";
String zkRoot = "/storm";
String id = "word";
BrokerHosts brokerHosts = new ZkHosts(zks);
SpoutConfig spoutConf = new SpoutConfig(brokerHosts, topic, zkRoot, id);
spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme());
// spoutConf.forceFromStart = false;
spoutConf.zkServers = Arrays.asList(new String[]{"hadoop"});
spoutConf.zkPort = 2181;
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("kafka-reader", new KafkaSpout(spoutConf), 1);
builder.setBolt("word-splitter", new KafkaWordSplitter(), 1).shuffleGrouping("kafka-reader");
builder.setBolt("word-counter", new WordCounter()).fieldsGrouping("word-splitter", new Fields("word"));
Config conf = new Config();
String name = MyKafkaTopology.class.getSimpleName();
conf.setMaxTaskParallelism(3);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology(name, conf, builder.createTopology());
Thread.sleep(60000);
cluster.shutdown();
}
}
[/mw_shl_code]
错误信息:
[mw_shl_code=applescript,true]239932 [Thread-15-kafka-reader-executor[2 2]-EventThread] INFO o.a.c.f.s.ConnectionStateManager - State change: CONNECTED
239937 [Thread-15-kafka-reader-executor[2 2]] INFO o.a.s.d.executor - Opened spout kafka-reader:(2)
239945 [Thread-15-kafka-reader-executor[2 2]] INFO o.a.s.d.executor - Activating spout kafka-reader:(2)
239945 [Thread-15-kafka-reader-executor[2 2]] INFO o.a.s.k.ZkCoordinator - Task [1/1] Refreshing partition manager connections
239953 [Thread-15-kafka-reader-executor[2 2]] INFO o.a.s.k.DynamicBrokersReader - Read partition info from zookeeper: GlobalPartitionInformation{topic=flume_topic, partitionMap={0=hadoop:9092}}
239956 [Thread-15-kafka-reader-executor[2 2]] INFO o.a.s.k.KafkaUtils - Task [1/1] assigned [Partition{host=hadoop:9092, topic=flume_topic, partition=0}]
239957 [Thread-15-kafka-reader-executor[2 2]] INFO o.a.s.k.ZkCoordinator - Task [1/1] Deleted partition managers: []
239957 [Thread-15-kafka-reader-executor[2 2]] INFO o.a.s.k.ZkCoordinator - Task [1/1] New partition managers: [Partition{host=hadoop:9092, topic=flume_topic, partition=0}]
240289 [Thread-15-kafka-reader-executor[2 2]] INFO o.a.s.k.PartitionManager - Read partition information from: /storm/word/partition_0 --> null
log4j:WARN No appenders could be found for logger (kafka.consumer.SimpleConsumer).
log4j:WARN Please initialize the log4j system properly.
240663 [Thread-15-kafka-reader-executor[2 2]] INFO o.a.s.k.PartitionManager - No partition information found, using configuration to determine offset
240663 [Thread-15-kafka-reader-executor[2 2]] INFO o.a.s.k.PartitionManager - Last commit offset from zookeeper: 0
240664 [Thread-15-kafka-reader-executor[2 2]] INFO o.a.s.k.PartitionManager - Commit offset 0 is more than 9223372036854775807 behind latest offset 0, resetting to startOffsetTime=-2
240664 [Thread-15-kafka-reader-executor[2 2]] INFO o.a.s.k.PartitionManager - Starting Kafka hadoop:0 from offset 0
240668 [Thread-15-kafka-reader-executor[2 2]] INFO o.a.s.k.ZkCoordinator - Task [1/1] Finished refreshing
240689 [main-SendThread(localhost:2000)] INFO o.a.s.s.o.a.z.ClientCnxn - Opening socket connection to server localhost/127.0.0.1:2000. Will not attempt to authenticate using SASL (unknown error)
240690 [main-SendThread(localhost:2000)] INFO o.a.s.s.o.a.z.ClientCnxn - Socket connection established to localhost/127.0.0.1:2000, initiating session
240690 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO o.a.s.s.o.a.z.s.NIOServerCnxnFactory - Accepted socket connection from /127.0.0.1:42367
240691 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO o.a.s.s.o.a.z.s.ZooKeeperServer - Client attempting to renew session 0x1550128fd330003 at /127.0.0.1:42367
240693 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO o.a.s.s.o.a.z.s.ZooKeeperServer - Established session 0x1550128fd330003 with negotiated timeout 20000 for client /127.0.0.1:42367
240696 [main-SendThread(localhost:2000)] INFO o.a.s.s.o.a.z.ClientCnxn - Session establishment complete on server localhost/127.0.0.1:2000, sessionid = 0x1550128fd330003, negotiated timeout = 20000
240697 [main-EventThread] INFO o.a.s.s.o.a.c.f.s.ConnectionStateManager - State change: RECONNECTED
240732 [Thread-15-kafka-reader-executor[2 2]] ERROR o.a.s.util - Async loop died!
java.lang.RuntimeException: java.nio.BufferUnderflowException
at org.apache.storm.kafka.KafkaUtils.fetchMessages(KafkaUtils.java:201) ~[storm-kafka-1.0.0.jar:1.0.0]
at org.apache.storm.kafka.PartitionManager.fill(PartitionManager.java:189) ~[storm-kafka-1.0.0.jar:1.0.0]
at org.apache.storm.kafka.PartitionManager.next(PartitionManager.java:138) ~[storm-kafka-1.0.0.jar:1.0.0]
at org.apache.storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:135) ~[storm-kafka-1.0.0.jar:1.0.0]
at org.apache.storm.daemon.executor$fn__7877$fn__7892$fn__7923.invoke(executor.clj:647) ~[storm-core-1.0.0.jar:1.0.0]
at org.apache.storm.util$async_loop$fn__625.invoke(util.clj:484) [storm-core-1.0.0.jar:1.0.0]
at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
at java.lang.Thread.run(Thread.java:745) [?:1.7.0_79]
Caused by: java.nio.BufferUnderflowException
at java.nio.HeapByteBuffer.get(HeapByteBuffer.java:145) ~[?:1.7.0_79]
at java.nio.ByteBuffer.get(ByteBuffer.java:694) ~[?:1.7.0_79]
at kafka.api.ApiUtils$.readShortString(ApiUtils.scala:40) ~[kafka_2.10-0.9.0.1.jar:?]
at kafka.api.TopicData$.readFrom(FetchResponse.scala:95) ~[kafka_2.10-0.9.0.1.jar:?]
at kafka.api.FetchResponse$$anonfun$4.apply(FetchResponse.scala:169) ~[kafka_2.10-0.9.0.1.jar:?]
at kafka.api.FetchResponse$$anonfun$4.apply(FetchResponse.scala:168) ~[kafka_2.10-0.9.0.1.jar:?]
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) ~[scala-library-2.10.5.jar:?]
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) ~[scala-library-2.10.5.jar:?]
at scala.collection.immutable.Range.foreach(Range.scala:141) ~[scala-library-2.10.5.jar:?]
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251) ~[scala-library-2.10.5.jar:?]
at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) ~[scala-library-2.10.5.jar:?]
at kafka.api.FetchResponse$.readFrom(FetchResponse.scala:168) ~[kafka_2.10-0.9.0.1.jar:?]
at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:135) ~[kafka_2.10-0.9.0.1.jar:?]
at kafka.javaapi.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:47) ~[kafka_2.10-0.9.0.1.jar:?]
at org.apache.storm.kafka.KafkaUtils.fetchMessages(KafkaUtils.java:191) ~[storm-kafka-1.0.0.jar:1.0.0]
... 7 more
240737 [Thread-15-kafka-reader-executor[2 2]] ERROR o.a.s.d.executor -
java.lang.RuntimeException: java.nio.BufferUnderflowException
at org.apache.storm.kafka.KafkaUtils.fetchMessages(KafkaUtils.java:201) ~[storm-kafka-1.0.0.jar:1.0.0]
at org.apache.storm.kafka.PartitionManager.fill(PartitionManager.java:189) ~[storm-kafka-1.0.0.jar:1.0.0]
at org.apache.storm.kafka.PartitionManager.next(PartitionManager.java:138) ~[storm-kafka-1.0.0.jar:1.0.0]
at org.apache.storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:135) ~[storm-kafka-1.0.0.jar:1.0.0]
at org.apache.storm.daemon.executor$fn__7877$fn__7892$fn__7923.invoke(executor.clj:647) ~[storm-core-1.0.0.jar:1.0.0]
at org.apache.storm.util$async_loop$fn__625.invoke(util.clj:484) [storm-core-1.0.0.jar:1.0.0]
at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
at java.lang.Thread.run(Thread.java:745) [?:1.7.0_79]
Caused by: java.nio.BufferUnderflowException
at java.nio.HeapByteBuffer.get(HeapByteBuffer.java:145) ~[?:1.7.0_79]
at java.nio.ByteBuffer.get(ByteBuffer.java:694) ~[?:1.7.0_79]
at kafka.api.ApiUtils$.readShortString(ApiUtils.scala:40) ~[kafka_2.10-0.9.0.1.jar:?]
at kafka.api.TopicData$.readFrom(FetchResponse.scala:95) ~[kafka_2.10-0.9.0.1.jar:?]
at kafka.api.FetchResponse$$anonfun$4.apply(FetchResponse.scala:169) ~[kafka_2.10-0.9.0.1.jar:?]
at kafka.api.FetchResponse$$anonfun$4.apply(FetchResponse.scala:168) ~[kafka_2.10-0.9.0.1.jar:?]
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) ~[scala-library-2.10.5.jar:?]
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) ~[scala-library-2.10.5.jar:?]
at scala.collection.immutable.Range.foreach(Range.scala:141) ~[scala-library-2.10.5.jar:?]
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251) ~[scala-library-2.10.5.jar:?]
at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) ~[scala-library-2.10.5.jar:?]
at kafka.api.FetchResponse$.readFrom(FetchResponse.scala:168) ~[kafka_2.10-0.9.0.1.jar:?]
at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:135) ~[kafka_2.10-0.9.0.1.jar:?]
at kafka.javaapi.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:47) ~[kafka_2.10-0.9.0.1.jar:?]
at org.apache.storm.kafka.KafkaUtils.fetchMessages(KafkaUtils.java:191) ~[storm-kafka-1.0.0.jar:1.0.0]
... 7 more
240803 [main-SendThread(localhost:2000)] INFO o.a.s.s.o.a.z.ClientCnxn - Opening socket connection to server localhost/127.0.0.1:2000. Will not attempt to authenticate using SASL (unknown error)
240804 [main-SendThread(localhost:2000)] INFO o.a.s.s.o.a.z.ClientCnxn - Socket connection established to localhost/127.0.0.1:2000, initiating session
240804 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO o.a.s.s.o.a.z.s.NIOServerCnxnFactory - Accepted socket connection from /127.0.0.1:42368
240805 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO o.a.s.s.o.a.z.s.ZooKeeperServer - Client attempting to renew session 0x1550128fd330006 at /127.0.0.1:42368
240807 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO o.a.s.s.o.a.z.s.ZooKeeperServer - Established session 0x1550128fd330006 with negotiated timeout 20000 for client /127.0.0.1:42368
240807 [main-SendThread(localhost:2000)] INFO o.a.s.s.o.a.z.ClientCnxn - Session establishment complete on server localhost/127.0.0.1:2000, sessionid = 0x1550128fd330006, negotiated timeout = 20000
240808 [main-EventThread] INFO o.a.s.s.o.a.c.f.s.ConnectionStateManager - State change: RECONNECTED
240906 [main-SendThread(localhost:2000)] INFO o.a.s.s.o.a.z.ClientCnxn - Opening socket connection to server localhost/127.0.0.1:2000. Will not attempt to authenticate using SASL (unknown error)
240907 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO o.a.s.s.o.a.z.s.NIOServerCnxnFactory - Accepted socket connection from /127.0.0.1:42369
240907 [main-SendThread(localhost:2000)] INFO o.a.s.s.o.a.z.ClientCnxn - Socket connection established to localhost/127.0.0.1:2000, initiating session
240911 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO o.a.s.s.o.a.z.s.ZooKeeperServer - Client attempting to renew session 0x1550128fd33000c at /127.0.0.1:42369
240913 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO o.a.s.s.o.a.z.s.ZooKeeperServer - Established session 0x1550128fd33000c with negotiated timeout 20000 for client /127.0.0.1:42369
240914 [main-SendThread(localhost:2000)] INFO o.a.s.s.o.a.z.ClientCnxn - Session establishment complete on server localhost/127.0.0.1:2000, sessionid = 0x1550128fd33000c, negotiated timeout = 20000
240914 [main-EventThread] INFO o.a.s.s.o.a.c.f.s.ConnectionStateManager - State change: RECONNECTED
240925 [main-SendThread(localhost:2000)] INFO o.a.s.s.o.a.z.ClientCnxn - Opening socket connection to server localhost/127.0.0.1:2000. Will not attempt to authenticate using SASL (unknown error)
240925 [main-SendThread(localhost:2000)] INFO o.a.s.s.o.a.z.ClientCnxn - Socket connection established to localhost/127.0.0.1:2000, initiating session
240926 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO o.a.s.s.o.a.z.s.NIOServerCnxnFactory - Accepted socket connection from /127.0.0.1:42370
240928 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO o.a.s.s.o.a.z.s.ZooKeeperServer - Client attempting to renew session 0x1550128fd330000 at /127.0.0.1:42370
240930 [main-SendThread(localhost:2000)] INFO o.a.s.s.o.a.z.ClientCnxn - Session establishment complete on server localhost/127.0.0.1:2000, sessionid = 0x1550128fd330000, negotiated timeout = 20000
240931 [main-EventThread] INFO o.a.s.s.o.a.c.f.s.ConnectionStateManager - State change: RECONNECTED
240931 [main-SendThread(localhost:2000)] INFO o.a.s.s.o.a.z.ClientCnxn - Opening socket connection to server localhost/127.0.0.1:2000. Will not attempt to authenticate using SASL (unknown error)
240932 [main-SendThread(localhost:2000)] INFO o.a.s.s.o.a.z.ClientCnxn - Socket connection established to localhost/127.0.0.1:2000, initiating session
240932 [main-EventThread] INFO o.a.s.zookeeper - Zookeeper state update: :connected:none
240933 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO o.a.s.s.o.a.z.s.ZooKeeperServer - Established session 0x1550128fd330000 with negotiated timeout 20000 for client /127.0.0.1:42370
240934 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO o.a.s.s.o.a.z.s.NIOServerCnxnFactory - Accepted socket connection from /127.0.0.1:42371
240935 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO o.a.s.s.o.a.z.s.ZooKeeperServer - Client attempting to renew session 0x1550128fd330008 at /127.0.0.1:42371
240937 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO o.a.s.s.o.a.z.s.ZooKeeperServer - Established session 0x1550128fd330008 with negotiated timeout 20000 for client /127.0.0.1:42371
240938 [main-SendThread(localhost:2000)] INFO o.a.s.s.o.a.z.ClientCnxn - Session establishment complete on server localhost/127.0.0.1:2000, sessionid = 0x1550128fd330008, negotiated timeout = 20000
240938 [main-EventThread] INFO o.a.s.s.o.a.c.f.s.ConnectionStateManager - State change: RECONNECTED
240959 [main] INFO o.a.s.d.nimbus - Shutting down master
240962 [main-EventThread] INFO o.a.s.zookeeper - hadoop gained leadership
240974 [Curator-Framework-0] INFO o.a.s.s.o.a.c.f.i.CuratorFrameworkImpl - backgroundOperationsLoop exiting
240977 [ProcessThread(sid:0 cport:-1):] INFO o.a.s.s.o.a.z.s.PrepRequestProcessor - Processed session termination for sessionid: 0x1550128fd330003
240977 [Thread-15-kafka-reader-executor[2 2]] ERROR o.a.s.util - Halting process: ("Worker died")
java.lang.RuntimeException: ("Worker died")
at org.apache.storm.util$exit_process_BANG_.doInvoke(util.clj:341) [storm-core-1.0.0.jar:1.0.0]
at clojure.lang.RestFn.invoke(RestFn.java:423) [clojure-1.7.0.jar:?]
at org.apache.storm.daemon.worker$fn__8546$fn__8547.invoke(worker.clj:758) [storm-core-1.0.0.jar:1.0.0]
at org.apache.storm.daemon.executor$mk_executor_data$fn__7765$fn__7766.invoke(executor.clj:271) [storm-core-1.0.0.jar:1.0.0]
at org.apache.storm.util$async_loop$fn__625.invoke(util.clj:494) [storm-core-1.0.0.jar:1.0.0]
at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
at java.lang.Thread.run(Thread.java:745) [?:1.7.0_79]
240984 [main-EventThread] INFO o.a.s.s.o.a.z.ClientCnxn - EventThread shut down
240995 [main] INFO o.a.s.s.o.a.z.ZooKeeper - Session: 0x1550128fd330003 closed
240996 [Curator-Framework-0] INFO o.a.s.s.o.a.c.f.i.CuratorFrameworkImpl - backgroundOperationsLoop exiting
240995 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO o.a.s.s.o.a.z.s.NIOServerCnxn - Closed socket connection for client /127.0.0.1:42367 which had sessionid 0x1550128fd330003
Disconnected from the target VM, address: '127.0.0.1:54738', transport: 'socket'
Process finished with exit code 1
[/mw_shl_code]
|
|