分享

storm-kafka-0.8-plus 源码解析

52Pig 2014-10-19 14:23:06 发表于 代码分析 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 2 53491
本帖最后由 52Pig 于 2014-10-19 14:22 编辑
阅读导读:
1.DynamicPartitionConnections对象有什么用?
2.register函数做了哪些事?
3._coordinator 是干嘛的?
4.kafkaOffset可以反映出哪些信息?
5.每个partition的读取状况可以通过什么获取?



准备,一些相关类GlobalPartitionInformation (storm.kafka.trident)记录partitionid和broker的关系

  1. GlobalPartitionInformation info = new GlobalPartitionInformation();
  2. info.addPartition(0, new Broker("10.1.110.24",9092));
  3. info.addPartition(0, new Broker("10.1.110.21",9092));
复制代码
可以静态的生成GlobalPartitionInformation,向上面代码一样 也可以动态的从zk获取,推荐这种方式 从zk获取就会用到DynamicBrokersReader
DynamicBrokersReader核心就是从zk上读出partition和broker的对应关系 操作zk都是使用curator框架核心函数
  1. /**
  2.      * Get all partitions with their current leaders
  3.      */
  4.     public GlobalPartitionInformation getBrokerInfo() {
  5.         GlobalPartitionInformation globalPartitionInformation = new GlobalPartitionInformation();
  6.         try {
  7.             int numPartitionsForTopic = getNumPartitions(); //从zk取得partition的数目
  8.             String brokerInfoPath = brokerPath();
  9.             for (int partition = 0; partition < numPartitionsForTopic; partition++) {
  10.                 int leader = getLeaderFor(partition); //从zk获取partition的leader broker
  11.                 String path = brokerInfoPath + "/" + leader;
  12.                 try {
  13.                     byte[] brokerData = _curator.getData().forPath(path);
  14.                     Broker hp = getBrokerHost(brokerData); //从zk获取broker的host:port
  15.                     globalPartitionInformation.addPartition(partition, hp);//生成GlobalPartitionInformation
  16.                 } catch (org.apache.zookeeper.KeeperException.NoNodeException e) {
  17.                     LOG.error("Node {} does not exist ", path);
  18.                 }
  19.             }
  20.         } catch (Exception e) {
  21.             throw new RuntimeException(e);
  22.         }
  23.         LOG.info("Read partition info from zookeeper: " + globalPartitionInformation);
  24.         return globalPartitionInformation;
  25.     }
复制代码
DynamicPartitionConnections
维护到每个broker的connection,并记录下每个broker上对应的partitions核心数据结构,为每个broker维持一个ConnectionInfo
  1. Map<Broker, ConnectionInfo> _connections = new HashMap();
复制代码
ConnectionInfo的定义,包含连接该broker的SimpleConsumer和记录partitions的set
  1. static class ConnectionInfo {
  2.         SimpleConsumer consumer;
  3.         Set<Integer> partitions = new HashSet();
  4.         public ConnectionInfo(SimpleConsumer consumer) {
  5.             this.consumer = consumer;
  6.         }
  7.     }
复制代码
核心函数,就是register
  1. public SimpleConsumer register(Broker host, int partition) {
  2.         if (!_connections.containsKey(host)) {
  3.             _connections.put(host, new ConnectionInfo(new SimpleConsumer(host.host, host.port, _config.socketTimeoutMs, _config.bufferSizeBytes, _config.clientId)));
  4.         }
  5.         ConnectionInfo info = _connections.get(host);
  6.         info.partitions.add(partition);
  7.         return info.consumer;
  8.     }
复制代码
PartitionManager
关键核心逻辑,用于管理一个partiiton的读取状态
先理解下面几个变量:
  1. Long _emittedToOffset;
  2. Long _committedTo;
  3. SortedSet<Long> _pending = new TreeSet<Long>();
  4. LinkedList<MessageAndRealOffset> _waitingToEmit = new LinkedList<MessageAndRealOffset>();
复制代码
kafka对于一个partition,一定是从offset从小到大按顺序读的,并且这里为了保证不读丢数据,会定期的将当前状态即offset写入zk几个中间状态,从kafka读到的offset,_emittedToOffset 从kafka读到的messages会放入_waitingToEmit,放入这个list,我们就认为一定会被emit,所以emittedToOffset可以认为是从kafka读到的offset 已经成功处理的offset,lastCompletedOffset 由于message是要在storm里面处理的,其中是可能fail的,所以正在处理的offset是缓存在_pending中的 ,如果_pending为空,那么lastCompletedOffset=_emittedToOffset ;如果_pending不为空,那么lastCompletedOffset为pending list里面第一个offset,因为后面都还在等待ack。
  1. public long lastCompletedOffset() {
  2.         if (_pending.isEmpty()) {
  3.             return _emittedToOffset;
  4.         } else {
  5.             return _pending.first();
  6.         }
  7.     }
复制代码
已经写入zk的offset,_committedTo 我们需要定期将lastCompletedOffset,写入zk,否则crash后,我们不知道上次读到哪儿了 ,所以_committedTo <= lastCompletedOffset 完整过程
1. 初始化,关键就是注册partition,然后初始化offset,以知道从哪里开始读
  1. public PartitionManager(DynamicPartitionConnections connections, String topologyInstanceId, ZkState state, Map stormConf, SpoutConfig spoutConfig, Partition id) {
  2.         _partition = id;
  3.         _connections = connections;
  4.         _spoutConfig = spoutConfig;
  5.         _topologyInstanceId = topologyInstanceId;
  6.         _consumer = connections.register(id.host, id.partition); //注册partition到connections,并生成simpleconsumer
  7.         _state = state;
  8.         _stormConf = stormConf;
  9.         String jsonTopologyId = null;
  10.         Long jsonOffset = null;
  11.         String path = committedPath();
  12.         try {
  13.             Map<Object, Object> json = _state.readJSON(path);
  14.             LOG.info("Read partition information from: " + path +  " --> " + json );
  15.             if (json != null) {
  16.                 jsonTopologyId = (String) ((Map<Object, Object>) json.get("topology")).get("id");
  17.                 jsonOffset = (Long) json.get("offset"); // 从zk中读出commited offset
  18.             }
  19.         } catch (Throwable e) {
  20.             LOG.warn("Error reading and/or parsing at ZkNode: " + path, e);
  21.         }
  22.         if (jsonTopologyId == null || jsonOffset == null) { // zk中没有记录,那么根据spoutConfig.startOffsetTime设置offset,Earliest或Latest
  23.             _committedTo = KafkaUtils.getOffset(_consumer, spoutConfig.topic, id.partition, spoutConfig);
  24.             LOG.info("No partition information found, using configuration to determine offset");
  25.         } else if (!topologyInstanceId.equals(jsonTopologyId) && spoutConfig.forceFromStart) {
  26.             _committedTo = KafkaUtils.getOffset(_consumer, spoutConfig.topic, id.partition, spoutConfig.startOffsetTime);
  27.             LOG.info("Topology change detected and reset from start forced, using configuration to determine offset");
  28.         } else {
  29.             _committedTo = jsonOffset;
  30.         }
  31.         _emittedToOffset = _committedTo; // 初始化时,中间状态都是一致的
  32.     }
复制代码
2. 从kafka读取messages,放到_waitingToEmit从kafka中读到数据ByteBufferMessageSet, 把需要emit的msg,MessageAndRealOffset,放到_waitingToEmit 把没完成的offset放到pending 更新emittedToOffset.
  1. private void fill() {
  2.         ByteBufferMessageSet msgs = KafkaUtils.fetchMessages(_spoutConfig, _consumer, _partition, _emittedToOffset);
  3.         for (MessageAndOffset msg : msgs) {
  4.             _pending.add(_emittedToOffset);
  5.             _waitingToEmit.add(new MessageAndRealOffset(msg.message(), _emittedToOffset));
  6.             _emittedToOffset = msg.nextOffset();
  7.         }
  8.     }
复制代码
其中fetch message的逻辑如下
  1. public static ByteBufferMessageSet fetchMessages(KafkaConfig config, SimpleConsumer consumer, Partition partition, long offset) {
  2.         ByteBufferMessageSet msgs = null;
  3.         String topic = config.topic;
  4.         int partitionId = partition.partition;
  5.         for (int errors = 0; errors < 2 && msgs == null; errors++) { // 容忍两次错误
  6.             FetchRequestBuilder builder = new FetchRequestBuilder();
  7.             FetchRequest fetchRequest = builder.addFetch(topic, partitionId, offset, config.fetchSizeBytes).
  8.                     clientId(config.clientId).build();
  9.             FetchResponse fetchResponse;
  10.             try {
  11.                 fetchResponse = consumer.fetch(fetchRequest);
  12.             } catch (Exception e) {
  13.                 if (e instanceof ConnectException) {
  14.                     throw new FailedFetchException(e);
  15.                 } else {
  16.                     throw new RuntimeException(e);
  17.                 }
  18.             }
  19.             if (fetchResponse.hasError()) { // 主要处理offset outofrange的case,通过getOffset从earliest或latest读
  20.                 KafkaError error = KafkaError.getError(fetchResponse.errorCode(topic, partitionId));
  21.                 if (error.equals(KafkaError.OFFSET_OUT_OF_RANGE) && config.useStartOffsetTimeIfOffsetOutOfRange && errors == 0) {
  22.                     long startOffset = getOffset(consumer, topic, partitionId, config.startOffsetTime);
  23.                     LOG.warn("Got fetch request with offset out of range: [" + offset + "]; " +
  24.                             "retrying with default start offset time from configuration. " +
  25.                             "configured start offset time: [" + config.startOffsetTime + "] offset: [" + startOffset + "]");
  26.                     offset = startOffset;
  27.                 } else {
  28.                     String message = "Error fetching data from [" + partition + "] for topic [" + topic + "]: [" + error + "]";
  29.                     LOG.error(message);
  30.                     throw new FailedFetchException(message);
  31.                 }
  32.             } else {
  33.                 msgs = fetchResponse.messageSet(topic, partitionId);
  34.             }
  35.         }
  36.         return msgs;
  37.     }
复制代码
3. emit msg
从_waitingToEmit中取到msg,转换成tuple,然后通过collector.emit发出去
  1. public EmitState next(SpoutOutputCollector collector) {
  2.         if (_waitingToEmit.isEmpty()) {
  3.             fill();
  4.         }
  5.         while (true) {
  6.             MessageAndRealOffset toEmit = _waitingToEmit.pollFirst();
  7.             if (toEmit == null) {
  8.                 return EmitState.NO_EMITTED;
  9.             }
  10.             Iterable<List<Object>> tups = KafkaUtils.generateTuples(_spoutConfig, toEmit.msg);
  11.             if (tups != null) {
  12.                 for (List<Object> tup : tups) {
  13.                     collector.emit(tup, new KafkaMessageId(_partition, toEmit.offset));
  14.                 }
  15.                 break;
  16.             } else {
  17.                 ack(toEmit.offset);
  18.             }
  19.         }
  20.         if (!_waitingToEmit.isEmpty()) {
  21.             return EmitState.EMITTED_MORE_LEFT;
  22.         } else {
  23.             return EmitState.EMITTED_END;
  24.         }
  25.     }
复制代码
可以看看转换tuple的过程, 可以看到是通过kafkaConfig.scheme.deserialize来做转换.
  1. public static Iterable<List<Object>> generateTuples(KafkaConfig kafkaConfig, Message msg) {
  2.         Iterable<List<Object>> tups;
  3.         ByteBuffer payload = msg.payload();
  4.         ByteBuffer key = msg.key();
  5.         if (key != null && kafkaConfig.scheme instanceof KeyValueSchemeAsMultiScheme) {
  6.             tups = ((KeyValueSchemeAsMultiScheme) kafkaConfig.scheme).deserializeKeyAndValue(Utils.toByteArray(key), Utils.toByteArray(payload));
  7.         } else {
  8.             tups = kafkaConfig.scheme.deserialize(Utils.toByteArray(payload));
  9.         }
  10.         return tups;
  11.     }
复制代码
所以你使用时,需要定义scheme逻辑:
  1. spoutConfig.scheme = new SchemeAsMultiScheme(new TestMessageScheme());
  2. public class TestMessageScheme implements Scheme {
  3.     private static final Logger LOGGER = LoggerFactory.getLogger(TestMessageScheme.class);
  4.    
  5.     @Override
  6.     public List<Object> deserialize(byte[] bytes) {
  7.     try {
  8.         String msg = new String(bytes, "UTF-8");
  9.         return new Values(msg);
  10.     } catch (InvalidProtocolBufferException e) {
  11.          LOGGER.error("Cannot parse the provided message!");
  12.     }
  13.         return null;
  14.     }
  15.     @Override
  16.     public Fields getOutputFields() {
  17.         return new Fields("msg");
  18.     }
  19. }
复制代码
4. 定期的commit offset
  1. public void commit() {
  2.         long lastCompletedOffset = lastCompletedOffset();
  3.         if (lastCompletedOffset != lastCommittedOffset()) {
  4.             Map<Object, Object> data = ImmutableMap.builder()
  5.                     .put("topology", ImmutableMap.of("id", _topologyInstanceId,
  6.                             "name", _stormConf.get(Config.TOPOLOGY_NAME)))
  7.                     .put("offset", lastCompletedOffset)
  8.                     .put("partition", _partition.partition)
  9.                     .put("broker", ImmutableMap.of("host", _partition.host.host,
  10.                             "port", _partition.host.port))
  11.                     .put("topic", _spoutConfig.topic).build();
  12.             _state.writeJSON(committedPath(), data);
  13.             _committedTo = lastCompletedOffset;
  14.         } else {
  15.             LOG.info("No new offset for " + _partition + " for topology: " + _topologyInstanceId);
  16.         }
  17.     }
复制代码
5. 最后关注一下,fail时的处理首先作者没有cache message,而只是cache offset所以fail的时候,他是无法直接replay的,在他的注释里面写了,不这样做的原因是怕内存爆掉。所以他的做法是,当一个offset fail的时候, 直接将_emittedToOffset回滚到当前fail的这个offset 下次从Kafka fetch的时候会从_emittedToOffset开始读,这样做的好处就是依赖kafka做replay,问题就是会有重复问题 所以使用时,一定要考虑,是否可以接受重复问题。
  1. public void fail(Long offset) {
  2.         //TODO: should it use in-memory ack set to skip anything that's been acked but not committed???
  3.         // things might get crazy with lots of timeouts
  4.         if (_emittedToOffset > offset) {
  5.             _emittedToOffset = offset;
  6.             _pending.tailSet(offset).clear();
  7.         }
  8.     }
复制代码
KafkaSpout
1. 初始化
关键就是初始化DynamicPartitionConnections和_coordinator
  1. public void open(Map conf, final TopologyContext context, final SpoutOutputCollector collector) {
  2.         _collector = collector;
  3.         Map stateConf = new HashMap(conf);
  4.         List<String> zkServers = _spoutConfig.zkServers;
  5.         if (zkServers == null) {
  6.             zkServers = (List<String>) conf.get(Config.STORM_ZOOKEEPER_SERVERS);
  7.         }
  8.         Integer zkPort = _spoutConfig.zkPort;
  9.         if (zkPort == null) {
  10.             zkPort = ((Number) conf.get(Config.STORM_ZOOKEEPER_PORT)).intValue();
  11.         }
  12.         stateConf.put(Config.TRANSACTIONAL_ZOOKEEPER_SERVERS, zkServers);
  13.         stateConf.put(Config.TRANSACTIONAL_ZOOKEEPER_PORT, zkPort);
  14.         stateConf.put(Config.TRANSACTIONAL_ZOOKEEPER_ROOT, _spoutConfig.zkRoot);
  15.         _state = new ZkState(stateConf);
  16.         _connections = new DynamicPartitionConnections(_spoutConfig, KafkaUtils.makeBrokerReader(conf, _spoutConfig));
  17.         // using TransactionalState like this is a hack
  18.         int totalTasks = context.getComponentTasks(context.getThisComponentId()).size();
  19.         if (_spoutConfig.hosts instanceof StaticHosts) {
  20.             _coordinator = new StaticCoordinator(_connections, conf, _spoutConfig, _state, context.getThisTaskIndex(), totalTasks, _uuid);
  21.         } else {
  22.             _coordinator = new ZkCoordinator(_connections, conf, _spoutConfig, _state, context.getThisTaskIndex(), totalTasks, _uuid);
  23.         }
  24.     }
复制代码

_coordinator 是干嘛的?这很关键,因为我们一般都会开多个并发的kafkaspout,类似于high-level中的consumer group,如何保证这些并发的线程不冲突?使用和highlevel一样的思路,一个partition只会有一个spout消费,这样就避免处理麻烦的访问互斥问题(kafka做访问互斥很麻烦,试着想想) 是根据当前spout的task数和partition数来分配,task和partitioin的对应关系的,并且为每个partition建立
PartitionManager
这里首先看到totalTasks就是当前这个spout component的task size StaticCoordinator和ZkCoordinator的差别就是, 从StaticHost还是从Zk读到partition的信息,简单起见,看看StaticCoordinator实现:
  1. public class StaticCoordinator implements PartitionCoordinator {
  2.     Map<Partition, PartitionManager> _managers = new HashMap<Partition, PartitionManager>();
  3.     List<PartitionManager> _allManagers = new ArrayList();
  4.     public StaticCoordinator(DynamicPartitionConnections connections, Map stormConf, SpoutConfig config, ZkState state, int taskIndex, int totalTasks, String topologyInstanceId) {
  5.         StaticHosts hosts = (StaticHosts) config.hosts;
  6.         List<Partition> myPartitions = KafkaUtils.calculatePartitionsForTask(hosts.getPartitionInformation(), totalTasks, taskIndex);
  7.         for (Partition myPartition : myPartitions) {// 建立PartitionManager
  8.             _managers.put(myPartition, new PartitionManager(connections, topologyInstanceId, state, stormConf, config, myPartition));
  9.         }
  10.         _allManagers = new ArrayList(_managers.values());
  11.     }
  12.     @Override
  13.     public List<PartitionManager> getMyManagedPartitions() {
  14.         return _allManagers;
  15.     }
  16.     public PartitionManager getManager(Partition partition) {
  17.         return _managers.get(partition);
  18.     }
  19. }
复制代码
其中分配的逻辑在calculatePartitionsForTask
  1. public static List<Partition> calculatePartitionsForTask(GlobalPartitionInformation partitionInformation, int totalTasks, int taskIndex) {
  2.         Preconditions.checkArgument(taskIndex < totalTasks, "task index must be less that total tasks");
  3.         List<Partition> partitions = partitionInformation.getOrderedPartitions();
  4.         int numPartitions = partitions.size();
  5.         List<Partition> taskPartitions = new ArrayList<Partition>();
  6.         for (int i = taskIndex; i < numPartitions; i += totalTasks) {// 平均分配,
  7.             Partition taskPartition = partitions.get(i);
  8.             taskPartitions.add(taskPartition);
  9.         }
  10.         logPartitionMapping(totalTasks, taskIndex, taskPartitions);
  11.         return taskPartitions;
  12.     }
复制代码
2. nextTuple
逻辑写的比较tricky,其实只要从一个partition读成功一次 ,只所以要for,是当EmitState.NO_EMITTED时,需要遍历后面的partition以保证读成功一次
  1. @Override
  2.     public void nextTuple() {
  3.         List<PartitionManager> managers = _coordinator.getMyManagedPartitions();
  4.         for (int i = 0; i < managers.size(); i++) {
  5.             // in case the number of managers decreased
  6.             _currPartitionIndex = _currPartitionIndex % managers.size(); //_currPartitionIndex初始为0,每次依次读一个partition
  7.             EmitState state = managers.get(_currPartitionIndex).next(_collector); //调用PartitonManager.next去emit数据
  8.             if (state != EmitState.EMITTED_MORE_LEFT) { //当EMITTED_MORE_LEFT时,还有数据,可以继续读,不需要+1
  9.                 _currPartitionIndex = (_currPartitionIndex + 1) % managers.size();
  10.             }
  11.             if (state != EmitState.NO_EMITTED) { //当EmitState.NO_EMITTED时,表明partition的数据已经读完,也就是没有读到数据,所以不能break
  12.                 break;
  13.             }
  14.         }
  15.         long now = System.currentTimeMillis();
  16.         if ((now - _lastUpdateMs) > _spoutConfig.stateUpdateIntervalMs) {
  17.             commit(); //定期commit
  18.         }
  19.     }
复制代码
定期commit的逻辑,遍历去commit每个PartitionManager
  1. private void commit() {
  2.         _lastUpdateMs = System.currentTimeMillis();
  3.         for (PartitionManager manager : _coordinator.getMyManagedPartitions()) {
  4.             manager.commit();
  5.         }
  6.     }
复制代码
3. Ack和Fail
直接调用PartitionManager
  1. @Override
  2.     public void ack(Object msgId) {
  3.         KafkaMessageId id = (KafkaMessageId) msgId;
  4.         PartitionManager m = _coordinator.getManager(id.partition);
  5.         if (m != null) {
  6.             m.ack(id.offset);
  7.         }
  8.     }
  9.     @Override
  10.     public void fail(Object msgId) {
  11.         KafkaMessageId id = (KafkaMessageId) msgId;
  12.         PartitionManager m = _coordinator.getManager(id.partition);
  13.         if (m != null) {
  14.             m.fail(id.offset);
  15.         }
  16.     }
复制代码
4. declareOutputFields
所以在scheme里面需要定义,deserialize和getOutputFields
  1. @Override
  2.     public void declareOutputFields(OutputFieldsDeclarer declarer) {
  3.         declarer.declare(_spoutConfig.scheme.getOutputFields());
  4.     }
复制代码
Metrics
再来看下Metrics,关键学习一下如何在storm里面加metrics 。在spout.open里面初始化了下面两个metrics
kafkaOffset
反映出每个partition的earliestTimeOffset,latestTimeOffset,和latestEmittedOffset,其中latestTimeOffset - latestEmittedOffset就是spout lag 除了反映出每个partition的,还会算出所有的partitions的总数据
  1. context.registerMetric("kafkaOffset", new IMetric() {
  2.             KafkaUtils.KafkaOffsetMetric _kafkaOffsetMetric = new KafkaUtils.KafkaOffsetMetric(_spoutConfig.topic, _connections);
  3.             @Override
  4.             public Object getValueAndReset() {
  5.                 List<PartitionManager> pms = _coordinator.getMyManagedPartitions(); //从coordinator获取pms的信息
  6.                 Set<Partition> latestPartitions = new HashSet();
  7.                 for (PartitionManager pm : pms) {
  8.                     latestPartitions.add(pm.getPartition());
  9.                 }
  10.                 _kafkaOffsetMetric.refreshPartitions(latestPartitions); //根据最新的partition信息删除metric中已经不存在的partition的统计信息
  11.                 for (PartitionManager pm : pms) {
  12.                     _kafkaOffsetMetric.setLatestEmittedOffset(pm.getPartition(), pm.lastCompletedOffset()); //更新metric中每个partition的已经完成的offset
  13.                 }
  14.                 return _kafkaOffsetMetric.getValueAndReset();
  15.             }
  16.         }, _spoutConfig.metricsTimeBucketSizeInSecs);
复制代码
_kafkaOffsetMetric.getValueAndReset,其实只是get,不需要reset
  1. @Override
  2.         public Object getValueAndReset() {
  3.             try {
  4.                 long totalSpoutLag = 0;
  5.                 long totalEarliestTimeOffset = 0;
  6.                 long totalLatestTimeOffset = 0;
  7.                 long totalLatestEmittedOffset = 0;
  8.                 HashMap ret = new HashMap();
  9.                 if (_partitions != null && _partitions.size() == _partitionToOffset.size()) {
  10.                     for (Map.Entry<Partition, Long> e : _partitionToOffset.entrySet()) {
  11.                         Partition partition = e.getKey();
  12.                         SimpleConsumer consumer = _connections.getConnection(partition);
  13.                         long earliestTimeOffset = getOffset(consumer, _topic, partition.partition, kafka.api.OffsetRequest.EarliestTime());
  14.                         long latestTimeOffset = getOffset(consumer, _topic, partition.partition, kafka.api.OffsetRequest.LatestTime());
  15.                         long latestEmittedOffset = e.getValue();
  16.                         long spoutLag = latestTimeOffset - latestEmittedOffset;
  17.                         ret.put(partition.getId() + "/" + "spoutLag", spoutLag);
  18.                         ret.put(partition.getId() + "/" + "earliestTimeOffset", earliestTimeOffset);
  19.                         ret.put(partition.getId() + "/" + "latestTimeOffset", latestTimeOffset);
  20.                         ret.put(partition.getId() + "/" + "latestEmittedOffset", latestEmittedOffset);
  21.                         totalSpoutLag += spoutLag;
  22.                         totalEarliestTimeOffset += earliestTimeOffset;
  23.                         totalLatestTimeOffset += latestTimeOffset;
  24.                         totalLatestEmittedOffset += latestEmittedOffset;
  25.                     }
  26.                     ret.put("totalSpoutLag", totalSpoutLag);
  27.                     ret.put("totalEarliestTimeOffset", totalEarliestTimeOffset);
  28.                     ret.put("totalLatestTimeOffset", totalLatestTimeOffset);
  29.                     ret.put("totalLatestEmittedOffset", totalLatestEmittedOffset);
  30.                     return ret;
  31.                 } else {
  32.                     LOG.info("Metrics Tick: Not enough data to calculate spout lag.");
  33.                 }
  34.             } catch (Throwable t) {
  35.                 LOG.warn("Metrics Tick: Exception when computing kafkaOffset metric.", t);
  36.             }
  37.             return null;
  38.         }
复制代码
kafkaPartition
反映出从Kafka fetch数据的情况,fetchAPILatencyMax,fetchAPILatencyMean,fetchAPICallCount 和 fetchAPIMessageCount
  1. context.registerMetric("kafkaPartition", new IMetric() {
  2.             @Override
  3.             public Object getValueAndReset() {
  4.                 List<PartitionManager> pms = _coordinator.getMyManagedPartitions();
  5.                 Map concatMetricsDataMaps = new HashMap();
  6.                 for (PartitionManager pm : pms) {
  7.                     concatMetricsDataMaps.putAll(pm.getMetricsDataMap());
  8.                 }
  9.                 return concatMetricsDataMaps;
  10.             }
  11.         }, _spoutConfig.metricsTimeBucketSizeInSecs);
复制代码
pm.getMetricsDataMap()
  1. public Map getMetricsDataMap() {
  2.         Map ret = new HashMap();
  3.         ret.put(_partition + "/fetchAPILatencyMax", _fetchAPILatencyMax.getValueAndReset());
  4.         ret.put(_partition + "/fetchAPILatencyMean", _fetchAPILatencyMean.getValueAndReset());
  5.         ret.put(_partition + "/fetchAPICallCount", _fetchAPICallCount.getValueAndReset());
  6.         ret.put(_partition + "/fetchAPIMessageCount", _fetchAPIMessageCount.getValueAndReset());
  7.         return ret;
  8.     }
复制代码
更新的逻辑如下
  1. private void fill() {
  2.         long start = System.nanoTime();
  3.         ByteBufferMessageSet msgs = KafkaUtils.fetchMessages(_spoutConfig, _consumer, _partition, _emittedToOffset);
  4.         long end = System.nanoTime();
  5.         long millis = (end - start) / 1000000;
  6.         _fetchAPILatencyMax.update(millis);
  7.         _fetchAPILatencyMean.update(millis);
  8.         _fetchAPICallCount.incr();
  9.         int numMessages = countMessages(msgs);
  10.         _fetchAPIMessageCount.incrBy(numMessages);
  11. }
复制代码
我们在读取kafka时,首先是关心,每个partition的读取状况,这个通过取得KafkaOffset Metrics就可以知道。再者,我们需要replay数据,使用high-level接口的时候可以通过系统提供的工具,这里如何搞?看下下面的代码,
第一个if,是从配置文件里面没有读到配置的情况
第二个else if,当topologyInstanceId发生变化时,并且forceFromStart为true时,就会取startOffsetTime指定的offset(Latest或Earliest) 这个topologyInstanceId, 每次KafkaSpout对象生成的时候随机产生,
String _uuid = UUID.randomUUID().toString();
Spout对象是在topology提交时,在client端生成一次的,所以如果topology停止,再重新启动,这个id一定会发生变化。所以应该是只需要把forceFromStart设为true,再重启topology,就可以实现replay。
  1. if (jsonTopologyId == null || jsonOffset == null) { // failed to parse JSON?
  2.             _committedTo = KafkaUtils.getOffset(_consumer, spoutConfig.topic, id.partition, spoutConfig);
  3.             LOG.info("No partition information found, using configuration to determine offset");
  4.         } else if (!topologyInstanceId.equals(jsonTopologyId) && spoutConfig.forceFromStart) {
  5.             _committedTo = KafkaUtils.getOffset(_consumer, spoutConfig.topic, id.partition, spoutConfig.startOffsetTime);
  6.             LOG.info("Topology change detected and reset from start forced, using configuration to determine offset");
  7.         } else {
  8.             _committedTo = jsonOffset;
  9.             LOG.info("Read last commit offset from zookeeper: " + _committedTo + "; old topology_id: " + jsonTopologyId + " - new topology_id: " + topologyInstanceId );
  10.         }
复制代码
代码例子
storm-kafka的文档很差,最后附上使用的例子
  1. import storm.kafka.KafkaSpout;
  2. import storm.kafka.SpoutConfig;
  3. import storm.kafka.BrokerHosts;
  4. import storm.kafka.ZkHosts;
  5. import storm.kafka.KeyValueSchemeAsMultiScheme;
  6. import storm.kafka.KeyValueScheme;
  7.     public static class SimplekVScheme implements KeyValueScheme { //定义scheme
  8.         @Override
  9.         public List<Object> deserializeKeyAndValue(byte[] key, byte[] value){
  10.             ArrayList tuple = new ArrayList();
  11.             tuple.add(key);
  12.             tuple.add(value);
  13.             return tuple;
  14.         }
  15.         
  16.         @Override
  17.         public List<Object> deserialize(byte[] bytes) {
  18.             ArrayList tuple = new ArrayList();
  19.             tuple.add(bytes);
  20.             return tuple;
  21.         }
  22.         @Override
  23.         public Fields getOutputFields() {
  24.             return new Fields("key","value");
  25.         }
  26.     }   
  27.         String topic = “test”;  //
  28.         String zkRoot = “/kafkastorm”; //
  29.         String spoutId = “id”; //读取的status会被存在,/kafkastorm/id下面,所以id类似consumer group
  30.         
  31.         BrokerHosts brokerHosts = new ZkHosts("10.1.110.24:2181,10.1.110.22:2181");
  32.         SpoutConfig spoutConfig = new SpoutConfig(brokerHosts, topic, zkRoot, spoutId);
  33.         spoutConfig.scheme = new KeyValueSchemeAsMultiScheme(new SimplekVScheme());
  34.         
  35.         /*spoutConfig.zkServers = new ArrayList<String>(){{ //只有在local模式下需要记录读取状态时,才需要设置
  36.             add("10.118.136.107");
  37.         }};
  38.         spoutConfig.zkPort = 2181;*/
  39.         
  40.         spoutConfig.forceFromStart = false;
  41.         spoutConfig.startOffsetTime = kafka.api.OffsetRequest.EarliestTime();   
  42.         spoutConfig.metricsTimeBucketSizeInSecs = 6;
  43.         builder.setSpout(SqlCollectorTopologyDef.KAFKA_SPOUT_NAME, new KafkaSpout(spoutConfig), 1);
复制代码















已有(2)人评论

跳转到指定楼层
gqx1984 发表于 2015-3-21 17:20:57
先留个印,后面项目用到
回复

使用道具 举报

Mr.k 发表于 2017-4-11 23:14:38
多谢楼主分享
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条