分享

Storm-kafka【接口实现】-1 DynamicBrokersReader

desehawk 发表于 2015-1-10 17:18:29 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 1 28988

问题导读

1.DynamicBrokersReader的作用是什么?
2.DynamicBrokersReader实现了哪些功能?








阅读前提:您可能需要对kafka有基本的认识,并且和idaokafka-storm之中的关系

本章主题: 实现一个对于kafkaBroker 动态读取的Class - DynamicBrokersReader

本章精要: 1 关注kafka在Storm之上的信息注册

                  2 关注微观的逻辑过程



DynamicBrokersReader

  1. package com.mixbox.storm.kafka;
  2. import backtype.storm.Config;
  3. import backtype.storm.utils.Utils;
  4. import com.mixbox.storm.kafka.trident.GlobalPartitionInformation;
  5. import com.netflix.curator.framework.CuratorFramework;
  6. import com.netflix.curator.framework.CuratorFrameworkFactory;
  7. import com.netflix.curator.retry.RetryNTimes;
  8. import org.json.simple.JSONValue;
  9. import org.slf4j.Logger;
  10. import org.slf4j.LoggerFactory;
  11. import java.io.IOException;
  12. import java.io.UnsupportedEncodingException;
  13. import java.util.List;
  14. import java.util.Map;
  15. /**
  16. * 动态的Broker读 我们维护了有一个与zk之间的连接,提供了获取指定topic的每一个partition正在活动着的leader所对应的broker
  17. * 这样你有能力知道,当前的这些topic,哪一些broker是活动的 * @author Yin Shuai
  18. */
  19. public class DynamicBrokersReader {
  20.     public static final Logger LOG = LoggerFactory
  21.             .getLogger(DynamicBrokersReader.class);
  22.     // 对于Client CuratorFrameWork的封装
  23.     private CuratorFramework _curator;
  24.     // 在Zk上注册的位置
  25.     private String _zkPath;
  26.     // 指定的_topic
  27.     private String _topic;
  28.     public DynamicBrokersReader(Map conf, String zkStr, String zkPath,
  29.             String topic) {
  30.         _zkPath = zkPath;
  31.         _topic = topic;
  32.         try {
  33.             _curator = CuratorFrameworkFactory
  34.                     .newClient(
  35.                             zkStr,
  36.                             Utils.getInt(conf
  37.                                     .get(Config.STORM_ZOOKEEPER_SESSION_TIMEOUT)),
  38.                             15000,
  39.                             new RetryNTimes(
  40.                                     Utils.getInt(conf
  41.                                             .get(Config.STORM_ZOOKEEPER_RETRY_TIMES)),
  42.                                     Utils.getInt(conf
  43.                                             .get(Config.STORM_ZOOKEEPER_RETRY_INTERVAL))));
  44.         } catch (IOException e) {
  45.             // TODO Auto-generated catch block
  46.             e.printStackTrace();
  47.         }
  48.         _curator.start();
  49.     }
  50.     public DynamicBrokersReader(String zkPath) {
  51.         this._zkPath = zkPath;
  52.     }
  53.     /**
  54.      * 确定指定topic下,每一个partition的leader,所对应的 主机和端口, 并将它们存入到全部分区信息中
  55.      *
  56.      */
  57.     public GlobalPartitionInformation getBrokerInfo() {
  58.         GlobalPartitionInformation globalPartitionInformation = new GlobalPartitionInformation();
  59.         try {
  60.             // 拿到当前的分区数目
  61.             int numPartitionsForTopic = getNumPartitions();
  62.             /**
  63.              * /brokers/ids
  64.              */
  65.             String brokerInfoPath = brokerPath();
  66.             // 默认的我们的分区数目就只有 0, 1 两个
  67.             for (int partition = 0; partition < numPartitionsForTopic; partition++) {
  68.                 // 这里请主要参考分区和领导者的关系
  69.                 int leader = getLeaderFor(partition);
  70.                 // 拿到领导者以后的zookeeper路径
  71.                 String leaderPath = brokerInfoPath + "/" + leader;
  72.                 try {
  73.                     byte[] brokerData = _curator.getData().forPath(leaderPath);
  74.                     /**
  75.                      * 在这里, 我们拿到的brokerData为:
  76.                      * {"jmx_port":-1,"timestamp":"1403076810435"
  77.                      * ,"host":"192.168.50.207","version":1,"port":9092} 注意
  78.                      * 这里是字节数组开始转json
  79.                      */
  80.                     Broker hp = getBrokerHost(brokerData);
  81.                     /**
  82.                      * 记录好 每一个分区 partition 所对应的 Broker
  83.                      */
  84.                     globalPartitionInformation.addPartition(partition, hp);
  85.                 } catch (org.apache.zookeeper.KeeperException.NoNodeException e) {
  86.                     LOG.error("Node {} does not exist ", leaderPath);
  87.                 }
  88.             }
  89.         } catch (Exception e) {
  90.             throw new RuntimeException(e);
  91.         }
  92.         LOG.info("Read partition info from zookeeper: "
  93.                 + globalPartitionInformation);
  94.         return globalPartitionInformation;
  95.     }
  96.     /**
  97.      * @return 拿到指定topic下的分区数目
  98.      */
  99.     private int getNumPartitions() {
  100.         try {
  101.             String topicBrokersPath = partitionPath();
  102.             List<String> children = _curator.getChildren().forPath(
  103.                     topicBrokersPath);
  104.             return children.size();
  105.         } catch (Exception e) {
  106.             throw new RuntimeException(e);
  107.         }
  108.     }
  109.     /**
  110.      * @return 拿到的topic在zookeeper注册的分区地址
  111.      *         brokers/topics/storm-sentence/partitions
  112.      */
  113.     public String partitionPath() {
  114.         return _zkPath + "/topics/" + _topic + "/partitions";
  115.     }
  116.     /**
  117.      *  持有的是Broker节点的id号码,这个id号是在配置的过程中为每一个Broker分配的
  118.      * @return   /brokers/ids
  119.      */
  120.     public String brokerPath() {
  121.         return _zkPath + "/ids";
  122.     }
  123.     /**
  124.      * get /brokers/topics/distributedTopic/partitions/1/state {
  125.      * "controller_epoch":4, "isr":[ 1, 0 ], "leader":1, "leader_epoch":1,
  126.      * "version":1 }
  127.      *
  128.      * 说明一下,在kafka之中,每一个分区都会有一个Leader,有0个或者多个的followers, 一个leader会处理这个分区的所有请求
  129.      * @param partition
  130.      * @return
  131.      */
  132.     private int getLeaderFor(long partition) {
  133.         try {
  134.             String topicBrokersPath = partitionPath();
  135.             byte[] hostPortData = _curator.getData().forPath(
  136.                     topicBrokersPath + "/" + partition + "/state");
  137.             @SuppressWarnings("unchecked")
  138.             Map<Object, Object> value = (Map<Object, Object>) JSONValue
  139.                     .parse(new String(hostPortData, "UTF-8"));
  140.             Integer leader = ((Number) value.get("leader")).intValue();
  141.             return leader;
  142.         } catch (Exception e) {
  143.             throw new RuntimeException(e);
  144.         }
  145.     }
  146.     public void close() {
  147.         _curator.close();
  148.     }
  149.     /**
  150.      * [zk: localhost:2181(CONNECTED) 56] get /brokers/ids/0 {
  151.      * "host":"localhost", "jmx_port":9999, "port":9092, "version":1 }
  152.      *
  153.      *
  154.      * @param contents
  155.      * @return
  156.      */
  157.     private Broker getBrokerHost(byte[] contents) {
  158.         try {
  159.             @SuppressWarnings("unchecked")
  160.             Map<Object, Object> value = (Map<Object, Object>) JSONValue
  161.                     .parse(new String(contents, "UTF-8"));
  162.             String host = (String) value.get("host");
  163.             Integer port = ((Long) value.get("port")).intValue();
  164.             return new Broker(host, port);
  165.         } catch (UnsupportedEncodingException e) {
  166.             throw new RuntimeException(e);
  167.         }
  168.     }
  169. }
复制代码

对于以上代码须知:

1 : 我们持有了一个ZkPath , 在Storm-kafka的class之中我们默认的是/brokers



2 : _topic ,  目前我们是针对的是Topic, 也就是说我们的partition,leader都是针对于单个Topic的



3:   

  1. int numPartitionsForTopic = getNumPartitions();
复制代码

  针对与一个Topic,首先我们要取当前的分区数,一般的情况,我们在kafka之中默认的分区数为2
  1. String brokerInfoPath = brokerPath();
复制代码

拿到 /brokers/ids 的分区号

  1. for (int partition = 0; partition < numPartitionsForTopic; partition++) {
复制代码

  依次的遍历每一个分区
  1. int leader = getLeaderFor(partition);String leaderPath = brokerInfoPath + "/" + leader;byte[] brokerData = _curator.getData().forPath(leaderPath);
复制代码

再通过分区拿到领导者,以及领导者的路径,最后拿到领导者的数据:

       我们举一个小例子

        * 在这里, 我们拿到的brokerData为:

  * {"jmx_port":-1,"timestamp":"1403076810435"

  * ,"host":"192.168.50.207","version":1,"port":9092}


  1. Broker hp = getBrokerHost(brokerData);
复制代码
拿到某一个Topic自己的分区在kafka所对应的Broker,并且其封装到 globalPartitionInformation

  1. globalPartitionInformation.addPartition(partition, hp);
复制代码



GlobalPartitionInformaton底层维护了一个HashMap



     总结: 简单的来说:DynamicBrokersReader 针对某一个Topic维护了  每一个分区 partition 所对应的 Broker












已有(1)人评论

跳转到指定楼层
ainubis 发表于 2015-3-29 18:10:20
好资料,谢谢分享。
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条