问题导读
1.DynamicBrokersReader的作用是什么?
2.DynamicBrokersReader实现了哪些功能?
阅读前提:您可能需要对kafka有基本的认识,并且和idaokafka-storm之中的关系
本章主题: 实现一个对于kafkaBroker 动态读取的Class - DynamicBrokersReader
本章精要: 1 关注kafka在Storm之上的信息注册
2 关注微观的逻辑过程
DynamicBrokersReader
- package com.mixbox.storm.kafka;
-
- import backtype.storm.Config;
- import backtype.storm.utils.Utils;
-
- import com.mixbox.storm.kafka.trident.GlobalPartitionInformation;
- import com.netflix.curator.framework.CuratorFramework;
- import com.netflix.curator.framework.CuratorFrameworkFactory;
- import com.netflix.curator.retry.RetryNTimes;
- import org.json.simple.JSONValue;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
-
- import java.io.IOException;
- import java.io.UnsupportedEncodingException;
- import java.util.List;
- import java.util.Map;
-
- /**
- * 动态的Broker读 我们维护了有一个与zk之间的连接,提供了获取指定topic的每一个partition正在活动着的leader所对应的broker
- * 这样你有能力知道,当前的这些topic,哪一些broker是活动的 * @author Yin Shuai
- */
-
- public class DynamicBrokersReader {
-
- public static final Logger LOG = LoggerFactory
- .getLogger(DynamicBrokersReader.class);
-
- // 对于Client CuratorFrameWork的封装
- private CuratorFramework _curator;
-
- // 在Zk上注册的位置
- private String _zkPath;
-
- // 指定的_topic
- private String _topic;
-
- public DynamicBrokersReader(Map conf, String zkStr, String zkPath,
- String topic) {
- _zkPath = zkPath;
- _topic = topic;
- try {
- _curator = CuratorFrameworkFactory
- .newClient(
- zkStr,
- Utils.getInt(conf
- .get(Config.STORM_ZOOKEEPER_SESSION_TIMEOUT)),
- 15000,
- new RetryNTimes(
- Utils.getInt(conf
- .get(Config.STORM_ZOOKEEPER_RETRY_TIMES)),
- Utils.getInt(conf
- .get(Config.STORM_ZOOKEEPER_RETRY_INTERVAL))));
- } catch (IOException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- _curator.start();
- }
-
- public DynamicBrokersReader(String zkPath) {
- this._zkPath = zkPath;
- }
-
- /**
- * 确定指定topic下,每一个partition的leader,所对应的 主机和端口, 并将它们存入到全部分区信息中
- *
- */
- public GlobalPartitionInformation getBrokerInfo() {
- GlobalPartitionInformation globalPartitionInformation = new GlobalPartitionInformation();
- try {
-
- // 拿到当前的分区数目
- int numPartitionsForTopic = getNumPartitions();
-
- /**
- * /brokers/ids
- */
- String brokerInfoPath = brokerPath();
-
- // 默认的我们的分区数目就只有 0, 1 两个
- for (int partition = 0; partition < numPartitionsForTopic; partition++) {
-
- // 这里请主要参考分区和领导者的关系
- int leader = getLeaderFor(partition);
-
- // 拿到领导者以后的zookeeper路径
- String leaderPath = brokerInfoPath + "/" + leader;
-
- try {
-
- byte[] brokerData = _curator.getData().forPath(leaderPath);
-
- /**
- * 在这里, 我们拿到的brokerData为:
- * {"jmx_port":-1,"timestamp":"1403076810435"
- * ,"host":"192.168.50.207","version":1,"port":9092} 注意
- * 这里是字节数组开始转json
- */
- Broker hp = getBrokerHost(brokerData);
-
- /**
- * 记录好 每一个分区 partition 所对应的 Broker
- */
- globalPartitionInformation.addPartition(partition, hp);
-
- } catch (org.apache.zookeeper.KeeperException.NoNodeException e) {
- LOG.error("Node {} does not exist ", leaderPath);
- }
- }
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- LOG.info("Read partition info from zookeeper: "
- + globalPartitionInformation);
- return globalPartitionInformation;
- }
-
- /**
- * @return 拿到指定topic下的分区数目
- */
- private int getNumPartitions() {
- try {
- String topicBrokersPath = partitionPath();
- List<String> children = _curator.getChildren().forPath(
- topicBrokersPath);
- return children.size();
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-
- /**
- * @return 拿到的topic在zookeeper注册的分区地址
- * brokers/topics/storm-sentence/partitions
- */
- public String partitionPath() {
- return _zkPath + "/topics/" + _topic + "/partitions";
- }
-
- /**
- * 持有的是Broker节点的id号码,这个id号是在配置的过程中为每一个Broker分配的
- * @return /brokers/ids
- */
- public String brokerPath() {
- return _zkPath + "/ids";
- }
-
- /**
- * get /brokers/topics/distributedTopic/partitions/1/state {
- * "controller_epoch":4, "isr":[ 1, 0 ], "leader":1, "leader_epoch":1,
- * "version":1 }
- *
- * 说明一下,在kafka之中,每一个分区都会有一个Leader,有0个或者多个的followers, 一个leader会处理这个分区的所有请求
- * @param partition
- * @return
- */
- private int getLeaderFor(long partition) {
- try {
- String topicBrokersPath = partitionPath();
- byte[] hostPortData = _curator.getData().forPath(
- topicBrokersPath + "/" + partition + "/state");
- @SuppressWarnings("unchecked")
- Map<Object, Object> value = (Map<Object, Object>) JSONValue
- .parse(new String(hostPortData, "UTF-8"));
- Integer leader = ((Number) value.get("leader")).intValue();
- return leader;
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-
- public void close() {
- _curator.close();
- }
-
- /**
- * [zk: localhost:2181(CONNECTED) 56] get /brokers/ids/0 {
- * "host":"localhost", "jmx_port":9999, "port":9092, "version":1 }
- *
- *
- * @param contents
- * @return
- */
- private Broker getBrokerHost(byte[] contents) {
- try {
- @SuppressWarnings("unchecked")
- Map<Object, Object> value = (Map<Object, Object>) JSONValue
- .parse(new String(contents, "UTF-8"));
- String host = (String) value.get("host");
- Integer port = ((Long) value.get("port")).intValue();
- return new Broker(host, port);
- } catch (UnsupportedEncodingException e) {
- throw new RuntimeException(e);
- }
- }
-
- }
复制代码
对于以上代码须知:
1 : 我们持有了一个ZkPath , 在Storm-kafka的class之中我们默认的是/brokers
2 : _topic , 目前我们是针对的是Topic, 也就是说我们的partition,leader都是针对于单个Topic的
3:
- int numPartitionsForTopic = getNumPartitions();
复制代码
针对与一个Topic,首先我们要取当前的分区数,一般的情况,我们在kafka之中默认的分区数为2
- String brokerInfoPath = brokerPath();
复制代码
拿到 /brokers/ids 的分区号
- for (int partition = 0; partition < numPartitionsForTopic; partition++) {
复制代码
依次的遍历每一个分区
- int leader = getLeaderFor(partition);String leaderPath = brokerInfoPath + "/" + leader;byte[] brokerData = _curator.getData().forPath(leaderPath);
复制代码
再通过分区拿到领导者,以及领导者的路径,最后拿到领导者的数据:
我们举一个小例子
* 在这里, 我们拿到的brokerData为:
* {"jmx_port":-1,"timestamp":"1403076810435"
* ,"host":"192.168.50.207","version":1,"port":9092}
- Broker hp = getBrokerHost(brokerData);
复制代码
拿到某一个Topic自己的分区在kafka所对应的Broker,并且其封装到 globalPartitionInformation
- globalPartitionInformation.addPartition(partition, hp);
复制代码
GlobalPartitionInformaton底层维护了一个HashMap
总结: 简单的来说:DynamicBrokersReader 针对某一个Topic维护了 每一个分区 partition 所对应的 Broker
|