分享

[ 求助] storm读取kafka数据报错

ltz 发表于 2016-5-30 18:53:30 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 2 24493
程序代码

[mw_shl_code=applescript,true]package org.open.storm.topology;

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.AlreadyAliveException;
import org.apache.storm.generated.InvalidTopologyException;
import org.apache.storm.kafka.*;
import org.apache.storm.spout.SchemeAsMultiScheme;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;

/**
* Created by root on 16-5-30.
*/

public class MyKafkaTopology {

    public static class KafkaWordSplitter extends BaseRichBolt {

        private static final long serialVersionUID = 886149197481637894L;
        private OutputCollector collector;

        @Override
        public void prepare(Map stormConf, TopologyContext context,
                            OutputCollector collector) {
            this.collector = collector;
        }

        @Override
        public void execute(Tuple input) {
            String line = input.getString(0);
            String[] words = line.split("\\s+");
            for(String word : words) {
                collector.emit(input, new Values(word, 1));
            }
            collector.ack(input);
        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("word", "count"));
        }

    }

    public static class WordCounter extends BaseRichBolt {

        private static final long serialVersionUID = 886149197481637894L;
        private OutputCollector collector;
        private Map<String, AtomicInteger> counterMap;

        @Override
        public void prepare(Map stormConf, TopologyContext context,
                            OutputCollector collector) {
            this.collector = collector;
            this.counterMap = new HashMap<String, AtomicInteger>();
        }

        @Override
        public void execute(Tuple input) {
            String word = input.getString(0);
            int count = input.getInteger(1);
            AtomicInteger ai = this.counterMap.get(word);
            if(ai == null) {
                ai = new AtomicInteger();
                this.counterMap.put(word, ai);
            }
            ai.addAndGet(count);
            collector.ack(input);
        }

        @Override
        public void cleanup() {
            Iterator<Map.Entry<String, AtomicInteger>> iter = this.counterMap.entrySet().iterator();
            while(iter.hasNext()) {
                Map.Entry<String, AtomicInteger> entry = iter.next();
            }

        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("word", "count"));
        }
    }

    public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException, InterruptedException {
        String zks = "hadoop:2181";
        String topic = "storm_topic";
        String zkRoot = "/storm";
        String id = "word";

        BrokerHosts brokerHosts = new ZkHosts(zks);
        SpoutConfig spoutConf = new SpoutConfig(brokerHosts, topic, zkRoot, id);
        spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme());
       // spoutConf.forceFromStart = false;
        spoutConf.zkServers = Arrays.asList(new String[]{"hadoop"});
        spoutConf.zkPort = 2181;

        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("kafka-reader", new KafkaSpout(spoutConf), 1);
        builder.setBolt("word-splitter", new KafkaWordSplitter(), 1).shuffleGrouping("kafka-reader");
        builder.setBolt("word-counter", new WordCounter()).fieldsGrouping("word-splitter", new Fields("word"));

        Config conf = new Config();

        String name = MyKafkaTopology.class.getSimpleName();

        conf.setMaxTaskParallelism(3);
        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology(name, conf, builder.createTopology());
        Thread.sleep(60000);
        cluster.shutdown();

    }
}
[/mw_shl_code]
错误信息:
[mw_shl_code=applescript,true]239932 [Thread-15-kafka-reader-executor[2 2]-EventThread] INFO  o.a.c.f.s.ConnectionStateManager - State change: CONNECTED
239937 [Thread-15-kafka-reader-executor[2 2]] INFO  o.a.s.d.executor - Opened spout kafka-reader:(2)
239945 [Thread-15-kafka-reader-executor[2 2]] INFO  o.a.s.d.executor - Activating spout kafka-reader:(2)
239945 [Thread-15-kafka-reader-executor[2 2]] INFO  o.a.s.k.ZkCoordinator - Task [1/1] Refreshing partition manager connections
239953 [Thread-15-kafka-reader-executor[2 2]] INFO  o.a.s.k.DynamicBrokersReader - Read partition info from zookeeper: GlobalPartitionInformation{topic=flume_topic, partitionMap={0=hadoop:9092}}
239956 [Thread-15-kafka-reader-executor[2 2]] INFO  o.a.s.k.KafkaUtils - Task [1/1] assigned [Partition{host=hadoop:9092, topic=flume_topic, partition=0}]
239957 [Thread-15-kafka-reader-executor[2 2]] INFO  o.a.s.k.ZkCoordinator - Task [1/1] Deleted partition managers: []
239957 [Thread-15-kafka-reader-executor[2 2]] INFO  o.a.s.k.ZkCoordinator - Task [1/1] New partition managers: [Partition{host=hadoop:9092, topic=flume_topic, partition=0}]
240289 [Thread-15-kafka-reader-executor[2 2]] INFO  o.a.s.k.PartitionManager - Read partition information from: /storm/word/partition_0  --> null
log4j:WARN No appenders could be found for logger (kafka.consumer.SimpleConsumer).
log4j:WARN Please initialize the log4j system properly.
240663 [Thread-15-kafka-reader-executor[2 2]] INFO  o.a.s.k.PartitionManager - No partition information found, using configuration to determine offset
240663 [Thread-15-kafka-reader-executor[2 2]] INFO  o.a.s.k.PartitionManager - Last commit offset from zookeeper: 0
240664 [Thread-15-kafka-reader-executor[2 2]] INFO  o.a.s.k.PartitionManager - Commit offset 0 is more than 9223372036854775807 behind latest offset 0, resetting to startOffsetTime=-2
240664 [Thread-15-kafka-reader-executor[2 2]] INFO  o.a.s.k.PartitionManager - Starting Kafka hadoop:0 from offset 0
240668 [Thread-15-kafka-reader-executor[2 2]] INFO  o.a.s.k.ZkCoordinator - Task [1/1] Finished refreshing
240689 [main-SendThread(localhost:2000)] INFO  o.a.s.s.o.a.z.ClientCnxn - Opening socket connection to server localhost/127.0.0.1:2000. Will not attempt to authenticate using SASL (unknown error)
240690 [main-SendThread(localhost:2000)] INFO  o.a.s.s.o.a.z.ClientCnxn - Socket connection established to localhost/127.0.0.1:2000, initiating session
240690 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO  o.a.s.s.o.a.z.s.NIOServerCnxnFactory - Accepted socket connection from /127.0.0.1:42367
240691 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO  o.a.s.s.o.a.z.s.ZooKeeperServer - Client attempting to renew session 0x1550128fd330003 at /127.0.0.1:42367
240693 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO  o.a.s.s.o.a.z.s.ZooKeeperServer - Established session 0x1550128fd330003 with negotiated timeout 20000 for client /127.0.0.1:42367
240696 [main-SendThread(localhost:2000)] INFO  o.a.s.s.o.a.z.ClientCnxn - Session establishment complete on server localhost/127.0.0.1:2000, sessionid = 0x1550128fd330003, negotiated timeout = 20000
240697 [main-EventThread] INFO  o.a.s.s.o.a.c.f.s.ConnectionStateManager - State change: RECONNECTED
240732 [Thread-15-kafka-reader-executor[2 2]] ERROR o.a.s.util - Async loop died!
java.lang.RuntimeException: java.nio.BufferUnderflowException
        at org.apache.storm.kafka.KafkaUtils.fetchMessages(KafkaUtils.java:201) ~[storm-kafka-1.0.0.jar:1.0.0]
        at org.apache.storm.kafka.PartitionManager.fill(PartitionManager.java:189) ~[storm-kafka-1.0.0.jar:1.0.0]
        at org.apache.storm.kafka.PartitionManager.next(PartitionManager.java:138) ~[storm-kafka-1.0.0.jar:1.0.0]
        at org.apache.storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:135) ~[storm-kafka-1.0.0.jar:1.0.0]
        at org.apache.storm.daemon.executor$fn__7877$fn__7892$fn__7923.invoke(executor.clj:647) ~[storm-core-1.0.0.jar:1.0.0]
        at org.apache.storm.util$async_loop$fn__625.invoke(util.clj:484) [storm-core-1.0.0.jar:1.0.0]
        at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
        at java.lang.Thread.run(Thread.java:745) [?:1.7.0_79]
Caused by: java.nio.BufferUnderflowException
        at java.nio.HeapByteBuffer.get(HeapByteBuffer.java:145) ~[?:1.7.0_79]
        at java.nio.ByteBuffer.get(ByteBuffer.java:694) ~[?:1.7.0_79]
        at kafka.api.ApiUtils$.readShortString(ApiUtils.scala:40) ~[kafka_2.10-0.9.0.1.jar:?]
        at kafka.api.TopicData$.readFrom(FetchResponse.scala:95) ~[kafka_2.10-0.9.0.1.jar:?]
        at kafka.api.FetchResponse$$anonfun$4.apply(FetchResponse.scala:169) ~[kafka_2.10-0.9.0.1.jar:?]
        at kafka.api.FetchResponse$$anonfun$4.apply(FetchResponse.scala:168) ~[kafka_2.10-0.9.0.1.jar:?]
        at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) ~[scala-library-2.10.5.jar:?]
        at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) ~[scala-library-2.10.5.jar:?]
        at scala.collection.immutable.Range.foreach(Range.scala:141) ~[scala-library-2.10.5.jar:?]
        at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251) ~[scala-library-2.10.5.jar:?]
        at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) ~[scala-library-2.10.5.jar:?]
        at kafka.api.FetchResponse$.readFrom(FetchResponse.scala:168) ~[kafka_2.10-0.9.0.1.jar:?]
        at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:135) ~[kafka_2.10-0.9.0.1.jar:?]
        at kafka.javaapi.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:47) ~[kafka_2.10-0.9.0.1.jar:?]
        at org.apache.storm.kafka.KafkaUtils.fetchMessages(KafkaUtils.java:191) ~[storm-kafka-1.0.0.jar:1.0.0]
        ... 7 more
240737 [Thread-15-kafka-reader-executor[2 2]] ERROR o.a.s.d.executor -
java.lang.RuntimeException: java.nio.BufferUnderflowException
        at org.apache.storm.kafka.KafkaUtils.fetchMessages(KafkaUtils.java:201) ~[storm-kafka-1.0.0.jar:1.0.0]
        at org.apache.storm.kafka.PartitionManager.fill(PartitionManager.java:189) ~[storm-kafka-1.0.0.jar:1.0.0]
        at org.apache.storm.kafka.PartitionManager.next(PartitionManager.java:138) ~[storm-kafka-1.0.0.jar:1.0.0]
        at org.apache.storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:135) ~[storm-kafka-1.0.0.jar:1.0.0]
        at org.apache.storm.daemon.executor$fn__7877$fn__7892$fn__7923.invoke(executor.clj:647) ~[storm-core-1.0.0.jar:1.0.0]
        at org.apache.storm.util$async_loop$fn__625.invoke(util.clj:484) [storm-core-1.0.0.jar:1.0.0]
        at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
        at java.lang.Thread.run(Thread.java:745) [?:1.7.0_79]
Caused by: java.nio.BufferUnderflowException
        at java.nio.HeapByteBuffer.get(HeapByteBuffer.java:145) ~[?:1.7.0_79]
        at java.nio.ByteBuffer.get(ByteBuffer.java:694) ~[?:1.7.0_79]
        at kafka.api.ApiUtils$.readShortString(ApiUtils.scala:40) ~[kafka_2.10-0.9.0.1.jar:?]
        at kafka.api.TopicData$.readFrom(FetchResponse.scala:95) ~[kafka_2.10-0.9.0.1.jar:?]
        at kafka.api.FetchResponse$$anonfun$4.apply(FetchResponse.scala:169) ~[kafka_2.10-0.9.0.1.jar:?]
        at kafka.api.FetchResponse$$anonfun$4.apply(FetchResponse.scala:168) ~[kafka_2.10-0.9.0.1.jar:?]
        at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) ~[scala-library-2.10.5.jar:?]
        at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) ~[scala-library-2.10.5.jar:?]
        at scala.collection.immutable.Range.foreach(Range.scala:141) ~[scala-library-2.10.5.jar:?]
        at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251) ~[scala-library-2.10.5.jar:?]
        at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) ~[scala-library-2.10.5.jar:?]
        at kafka.api.FetchResponse$.readFrom(FetchResponse.scala:168) ~[kafka_2.10-0.9.0.1.jar:?]
        at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:135) ~[kafka_2.10-0.9.0.1.jar:?]
        at kafka.javaapi.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:47) ~[kafka_2.10-0.9.0.1.jar:?]
        at org.apache.storm.kafka.KafkaUtils.fetchMessages(KafkaUtils.java:191) ~[storm-kafka-1.0.0.jar:1.0.0]
        ... 7 more
240803 [main-SendThread(localhost:2000)] INFO  o.a.s.s.o.a.z.ClientCnxn - Opening socket connection to server localhost/127.0.0.1:2000. Will not attempt to authenticate using SASL (unknown error)
240804 [main-SendThread(localhost:2000)] INFO  o.a.s.s.o.a.z.ClientCnxn - Socket connection established to localhost/127.0.0.1:2000, initiating session
240804 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO  o.a.s.s.o.a.z.s.NIOServerCnxnFactory - Accepted socket connection from /127.0.0.1:42368
240805 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO  o.a.s.s.o.a.z.s.ZooKeeperServer - Client attempting to renew session 0x1550128fd330006 at /127.0.0.1:42368
240807 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO  o.a.s.s.o.a.z.s.ZooKeeperServer - Established session 0x1550128fd330006 with negotiated timeout 20000 for client /127.0.0.1:42368
240807 [main-SendThread(localhost:2000)] INFO  o.a.s.s.o.a.z.ClientCnxn - Session establishment complete on server localhost/127.0.0.1:2000, sessionid = 0x1550128fd330006, negotiated timeout = 20000
240808 [main-EventThread] INFO  o.a.s.s.o.a.c.f.s.ConnectionStateManager - State change: RECONNECTED
240906 [main-SendThread(localhost:2000)] INFO  o.a.s.s.o.a.z.ClientCnxn - Opening socket connection to server localhost/127.0.0.1:2000. Will not attempt to authenticate using SASL (unknown error)
240907 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO  o.a.s.s.o.a.z.s.NIOServerCnxnFactory - Accepted socket connection from /127.0.0.1:42369
240907 [main-SendThread(localhost:2000)] INFO  o.a.s.s.o.a.z.ClientCnxn - Socket connection established to localhost/127.0.0.1:2000, initiating session
240911 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO  o.a.s.s.o.a.z.s.ZooKeeperServer - Client attempting to renew session 0x1550128fd33000c at /127.0.0.1:42369
240913 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO  o.a.s.s.o.a.z.s.ZooKeeperServer - Established session 0x1550128fd33000c with negotiated timeout 20000 for client /127.0.0.1:42369
240914 [main-SendThread(localhost:2000)] INFO  o.a.s.s.o.a.z.ClientCnxn - Session establishment complete on server localhost/127.0.0.1:2000, sessionid = 0x1550128fd33000c, negotiated timeout = 20000
240914 [main-EventThread] INFO  o.a.s.s.o.a.c.f.s.ConnectionStateManager - State change: RECONNECTED
240925 [main-SendThread(localhost:2000)] INFO  o.a.s.s.o.a.z.ClientCnxn - Opening socket connection to server localhost/127.0.0.1:2000. Will not attempt to authenticate using SASL (unknown error)
240925 [main-SendThread(localhost:2000)] INFO  o.a.s.s.o.a.z.ClientCnxn - Socket connection established to localhost/127.0.0.1:2000, initiating session
240926 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO  o.a.s.s.o.a.z.s.NIOServerCnxnFactory - Accepted socket connection from /127.0.0.1:42370
240928 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO  o.a.s.s.o.a.z.s.ZooKeeperServer - Client attempting to renew session 0x1550128fd330000 at /127.0.0.1:42370
240930 [main-SendThread(localhost:2000)] INFO  o.a.s.s.o.a.z.ClientCnxn - Session establishment complete on server localhost/127.0.0.1:2000, sessionid = 0x1550128fd330000, negotiated timeout = 20000
240931 [main-EventThread] INFO  o.a.s.s.o.a.c.f.s.ConnectionStateManager - State change: RECONNECTED
240931 [main-SendThread(localhost:2000)] INFO  o.a.s.s.o.a.z.ClientCnxn - Opening socket connection to server localhost/127.0.0.1:2000. Will not attempt to authenticate using SASL (unknown error)
240932 [main-SendThread(localhost:2000)] INFO  o.a.s.s.o.a.z.ClientCnxn - Socket connection established to localhost/127.0.0.1:2000, initiating session
240932 [main-EventThread] INFO  o.a.s.zookeeper - Zookeeper state update: :connected:none
240933 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO  o.a.s.s.o.a.z.s.ZooKeeperServer - Established session 0x1550128fd330000 with negotiated timeout 20000 for client /127.0.0.1:42370
240934 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO  o.a.s.s.o.a.z.s.NIOServerCnxnFactory - Accepted socket connection from /127.0.0.1:42371
240935 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO  o.a.s.s.o.a.z.s.ZooKeeperServer - Client attempting to renew session 0x1550128fd330008 at /127.0.0.1:42371
240937 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO  o.a.s.s.o.a.z.s.ZooKeeperServer - Established session 0x1550128fd330008 with negotiated timeout 20000 for client /127.0.0.1:42371
240938 [main-SendThread(localhost:2000)] INFO  o.a.s.s.o.a.z.ClientCnxn - Session establishment complete on server localhost/127.0.0.1:2000, sessionid = 0x1550128fd330008, negotiated timeout = 20000
240938 [main-EventThread] INFO  o.a.s.s.o.a.c.f.s.ConnectionStateManager - State change: RECONNECTED
240959 [main] INFO  o.a.s.d.nimbus - Shutting down master
240962 [main-EventThread] INFO  o.a.s.zookeeper - hadoop gained leadership
240974 [Curator-Framework-0] INFO  o.a.s.s.o.a.c.f.i.CuratorFrameworkImpl - backgroundOperationsLoop exiting
240977 [ProcessThread(sid:0 cport:-1):] INFO  o.a.s.s.o.a.z.s.PrepRequestProcessor - Processed session termination for sessionid: 0x1550128fd330003
240977 [Thread-15-kafka-reader-executor[2 2]] ERROR o.a.s.util - Halting process: ("Worker died")
java.lang.RuntimeException: ("Worker died")
        at org.apache.storm.util$exit_process_BANG_.doInvoke(util.clj:341) [storm-core-1.0.0.jar:1.0.0]
        at clojure.lang.RestFn.invoke(RestFn.java:423) [clojure-1.7.0.jar:?]
        at org.apache.storm.daemon.worker$fn__8546$fn__8547.invoke(worker.clj:758) [storm-core-1.0.0.jar:1.0.0]
        at org.apache.storm.daemon.executor$mk_executor_data$fn__7765$fn__7766.invoke(executor.clj:271) [storm-core-1.0.0.jar:1.0.0]
        at org.apache.storm.util$async_loop$fn__625.invoke(util.clj:494) [storm-core-1.0.0.jar:1.0.0]
        at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
        at java.lang.Thread.run(Thread.java:745) [?:1.7.0_79]
240984 [main-EventThread] INFO  o.a.s.s.o.a.z.ClientCnxn - EventThread shut down
240995 [main] INFO  o.a.s.s.o.a.z.ZooKeeper - Session: 0x1550128fd330003 closed
240996 [Curator-Framework-0] INFO  o.a.s.s.o.a.c.f.i.CuratorFrameworkImpl - backgroundOperationsLoop exiting
240995 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO  o.a.s.s.o.a.z.s.NIOServerCnxn - Closed socket connection for client /127.0.0.1:42367 which had sessionid 0x1550128fd330003
Disconnected from the target VM, address: '127.0.0.1:54738', transport: 'socket'

Process finished with exit code 1
[/mw_shl_code]

已有(2)人评论

跳转到指定楼层
xuanxufeng 发表于 2016-5-30 19:07:55
回复

使用道具 举报

xw2016 发表于 2016-6-1 10:13:37
还没学到这。。。
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条