分享

storm源码分析之topology提交过程

desehawk 发表于 2015-4-4 23:09:05 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 3 37278

问题导读

1.topology的jar包是 如何上传到nimbus上的?
2.storm的jar命令是由什么语言实现的?
3.参数jarfile表示什么?
4.topology的入口是什么?
5.main方法构建topology后,调用StormSubmitter类的哪个方法提交topology?
6.ComponentCommon定义了这个component的属性都包括什么?
7.submitTopology方法主要完成哪三件工作?








storm集群上运行的是一个个topology,一个topology是spouts 和bolts组成的图。当我们开发完topology程序后将其打成jar包,然后在shell中执行storm jar xxxxxx.jar xxxxxxxClass就可以将jar包上传到storm集群的nimbus上,并执行topology。本文主要分析下topology的jar包是 如何上传到nimbus上的。首先我们从storm的jar命令入手,jar命令的实现位于storm根目录的bin/storm文件里。定义如下:


  1. def jar(jarfile, klass, *args):
  2. """Syntax: [storm jar topology-jar-path class ...]
  3. Runs the main method of class with the specified arguments.
  4. The storm jars and configs in ~/.storm are put on the classpath.
  5. The process is configured so that StormSubmitter
  6. (http://nathanmarz.github.com/storm/doc/backtype/storm/StormSubmitter.html)
  7. will upload the jar at topology-jar-path when the topology is submitted.
  8. """
  9. exec_storm_class(
  10. klass,
  11. jvmtype="-client",
  12. extrajars=[jarfile, USER_CONF_DIR, STORM_DIR + "/bin"],
  13. args=args,
  14. jvmopts=[' '.join(filter(None, [JAR_JVM_OPTS, "-Dstorm.jar=" + jarfile]))])
复制代码




jar命令是由python实现的,很奇怪为什么不用clojure实现呢?(不得而知)。jarfile表示jar包的位置;klass表示 topology的入口,也就是有main函数的类;*args表示传递给main函数的参数。jvmtype=”-client”表示指定jvm类型为 client类型(jvm有两种类型client和server,服务器端默认为server类型);extrajars集合用于存放编译 topology的jar包时,所有依赖jar包的路径;jvmopts集合存放以jvm参数,这里比较重要的是-Dstorm.jar参数,这个参数的 值是jarfile,这样在运行submitTopology方法时就可以通过storm.jar参数获得jar包的路径了(通过jvm参数进行方法参数 传递)exec_storm_class函数的逻辑比较简单,具体实现如下:



  1. def exec_storm_class(klass, jvmtype="-server", jvmopts=[], extrajars=[], args=[], fork=False):
  2. global CONFFILE
  3. all_args = [
  4. "java", jvmtype, get_config_opts(),
  5. "-Dstorm.home=" + STORM_DIR,
  6. "-Djava.library.path=" + confvalue("java.library.path", extrajars),
  7. "-Dstorm.conf.file=" + CONFFILE,
  8. "-cp", get_classpath(extrajars),
  9. ] + jvmopts + [klass] + list(args)
  10. print "Running: " + " ".join(all_args)
  11. if fork:
  12. os.spawnvp(os.P_WAIT, "java", all_args)
  13. else:
  14. os.execvp("java", all_args) # replaces the current process and never returns
复制代码




get_config_opts()获取jvm的默认配置信息,confvalue(“java.library.path”, extrajars)获取storm使用的本地库JZMQ加载路径,get_classpath(extrajars)获取所有依赖jar包的完整路径, 然后拼接一个java -cp命令运行topology的main方法。接下来程序执行流程转移到topology的main方法内,我们以storm-starter项目中的 wordCountTopology的main方法为例:



  1. public static void main(String[] args) throws Exception {
  2. TopologyBuilder builder = new TopologyBuilder();
  3. builder.setSpout("spout", new RandomSentenceSpout(), 6);
  4. builder.setBolt("split", new SplitSentence(), 12).shuffleGrouping("spout");
  5. builder.setBolt("count", new WordCount(), 10).fieldsGrouping("split", new Fields("word"));
  6. Config conf = new Config();
  7. conf.setDebug(true);
  8. if (args != null && args.length > 0) {
  9. conf.setNumWorkers(4);
  10. StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
  11. }
  12. else {
  13. conf.setMaxTaskParallelism(3);
  14. LocalCluster cluster = new LocalCluster();
  15. cluster.submitTopology("word-count", conf, builder.createTopology());
  16. Thread.sleep(10000);
  17. cluster.shutdown();
  18. }
  19. }
复制代码




main方法构建topology后,调用StormSubmitter类的submitTopology方法提交topology。submitTopology方法如下:



  1. /**
  2. * Submits a topology to run on the cluster. A topology runs forever or until
  3. * explicitly killed.
  4. *
  5. *
  6. * @param name the name of the storm.
  7. * @param stormConf the topology-specific configuration. See {@link Config}.
  8. * @param topology the processing to execute.
  9. * @throws AlreadyAliveException if a topology with this name is already running
  10. * @throws InvalidTopologyException if an invalid topology was submitted
  11. */
  12. public static void submitTopology(String name, Map stormConf, StormTopology topology)
  13. throws AlreadyAliveException, InvalidTopologyException {
  14. submitTopology(name, stormConf, topology, null);
  15. }
  16. /**
  17. * Submits a topology to run on the cluster. A topology runs forever or until
  18. * explicitly killed.
  19. *
  20. *
  21. * @param name the name of the storm.
  22. * @param stormConf the topology-specific configuration. See {@link Config}.
  23. * @param topology the processing to execute.
  24. * @param options to manipulate the starting of the topology
  25. * @throws AlreadyAliveException if a topology with this name is already running
  26. * @throws InvalidTopologyException if an invalid topology was submitted
  27. */
  28. public static void submitTopology(String name, Map stormConf, StormTopology topology, SubmitOptions opts)
  29. throws AlreadyAliveException, InvalidTopologyException {
  30. if(!Utils.isValidConf(stormConf)) {
  31. throw new IllegalArgumentException("Storm conf is not valid. Must be json-serializable");
  32. }
  33. stormConf = new HashMap(stormConf);
  34. stormConf.putAll(Utils.readCommandLineOpts());
  35. Map conf = Utils.readStormConfig();
  36. conf.putAll(stormConf);
  37. try {
  38. String serConf = JSONValue.toJSONString(stormConf);
  39. if(localNimbus!=null) {
  40. LOG.info("Submitting topology " + name + " in local mode");
  41. localNimbus.submitTopology(name, null, serConf, topology);
  42. } else {
  43. NimbusClient client = NimbusClient.getConfiguredClient(conf);
  44. if(topologyNameExists(conf, name)) {
  45. throw new RuntimeException("Topology with name `" + name + "` already exists on cluster");
  46. }
  47. submitJar(conf);
  48. try {
  49. LOG.info("Submitting topology " +  name + " in distributed mode with conf " + serConf);
  50. if(opts!=null) {
  51. client.getClient().submitTopologyWithOpts(name, submittedJar, serConf, topology, opts);
  52. } else {
  53. // this is for backwards compatibility
  54. client.getClient().submitTopology(name, submittedJar, serConf, topology);
  55. }
  56. } catch(InvalidTopologyException e) {
  57. LOG.warn("Topology submission exception", e);
  58. throw e;
  59. } catch(AlreadyAliveException e) {
  60. LOG.warn("Topology already alive exception", e);
  61. throw e;
  62. } finally {
  63. client.close();
  64. }
  65. }
  66. LOG.info("Finished submitting topology: " +  name);
  67. } catch(TException e) {
  68. throw new RuntimeException(e);
  69. }
  70. }
复制代码




submitTopology方法主要完成三件工作:

1. 配置参数
把命令行参数放在stormConf, 从conf/storm.yaml读取配置参数到conf, 再把stormConf也put到conf, 可见命令行参数的优先级更高,将stormConf转化为Json, 因为这个配置是要发送到服务器的

2. 调用submitJar方法


  1. submitJar(conf)
  2. private static void submitJar(Map conf) {
  3. if(submittedJar==null) {
  4. LOG.info("Jar not uploaded to master yet. Submitting jar...");
  5. String localJar = System.getProperty("storm.jar");
  6. submittedJar = submitJar(conf, localJar);
  7. } else {
  8. LOG.info("Jar already uploaded to master. Not submitting jar.");
  9. }
  10. }
  11. System.getProperty("storm.jar")获取jvm参数storm.jar的值,即topology jar包的路径,然后调用重载方法submitJar。
  12. public static String submitJar(Map conf, String localJar) {
  13. if(localJar==null) {
  14. throw new RuntimeException("Must submit topologies using the 'storm' client script so that StormSubmitter knows which jar to upload.");
  15. }
  16. NimbusClient client = NimbusClient.getConfiguredClient(conf);
  17. try {
  18. String uploadLocation = client.getClient().beginFileUpload();
  19. LOG.info("Uploading topology jar " + localJar + " to assigned location: " + uploadLocation);
  20. BufferFileInputStream is = new BufferFileInputStream(localJar);
  21. while(true) {
  22. byte[] toSubmit = is.read();
  23. if(toSubmit.length==0) break;
  24. client.getClient().uploadChunk(uploadLocation, ByteBuffer.wrap(toSubmit));
  25. }
  26. client.getClient().finishFileUpload(uploadLocation);
  27. LOG.info("Successfully uploaded topology jar to assigned location: " + uploadLocation);
  28. return uploadLocation;
  29. } catch(Exception e) {
  30. throw new RuntimeException(e);
  31. } finally {
  32. client.close();
  33. }
  34. }
复制代码



StormSubmitter的本质是个Thrift Client,而Nimbus则是Thrift Server,所以所有的操作都是通过Thrift RPC来完成,submitJar首先创建client,然后调用nimbus thrift server的beginFileUpload()方法获取nimbus存放jar的目录。beginFileUpload函数如下:


  1. (beginFileUpload [this]
  2. (let [fileloc (str (inbox nimbus) "/stormjar-" (uuid) ".jar")]
  3. (.put (:uploaders nimbus)
  4. fileloc
  5. (Channels/newChannel (FileOutputStream. fileloc)))
  6. (log-message "Uploading file from client to " fileloc)
  7. fileloc
  8. ))
复制代码




(inbox nimbus)函数里面又调用了master-inbox函数,master-inbox主要创建storm.local.dir的值/inbox目录, 并返回完整目录名,所以topology jar包的将会通过uploadChunk方法上传到nimbus上的storm.local.dir的值/inbox/stormjar-32位 uuid.jar。

3. 生成thrift client并调用nimbus thrift server的submitTopologyWithOpts或submitTopology方法(submitTopologyWithOpts或 submitTopology方法定义在Nimbus.clj中),submitTopologyWithOpts如下:



  1. (^void submitTopologyWithOpts
  2. [this ^String storm-name ^String uploadedJarLocation ^String serializedConf ^StormTopology topology
  3. ^SubmitOptions submitOptions]
  4. (try
  5. (assert (not-nil? submitOptions))
  6. (validate-topology-name! storm-name)
  7. (check-storm-active! nimbus storm-name false)
  8. (let [topo-conf (from-json serializedConf)]
  9. (try
  10. (validate-configs-with-schemas topo-conf)
  11. (catch IllegalArgumentException ex
  12. (throw (InvalidTopologyException. (.getMessage ex)))))
  13. (.validate ^backtype.storm.nimbus.ITopologyValidator (:validator nimbus)
  14. storm-name
  15. topo-conf
  16. topology))
  17. (swap! (:submitted-count nimbus) inc)
  18. (let [storm-id (str storm-name "-" @(:submitted-count nimbus) "-" (current-time-secs))
  19. storm-conf (normalize-conf
  20. conf
  21. (-> serializedConf
  22. from-json
  23. (assoc STORM-ID storm-id)
  24. (assoc TOPOLOGY-NAME storm-name))
  25. topology)
  26. total-storm-conf (merge conf storm-conf)
  27. topology (normalize-topology total-storm-conf topology)
  28. storm-cluster-state (:storm-cluster-state nimbus)]
  29. (system-topology! total-storm-conf topology) ;; this validates the structure of the topology
  30. (log-message "Received topology submission for " storm-name " with conf " storm-conf)
  31. ;; lock protects against multiple topologies being submitted at once and
  32. ;; cleanup thread killing topology in b/w assignment and starting the topology
  33. (locking (:submit-lock nimbus)
  34. (setup-storm-code conf storm-id uploadedJarLocation storm-conf topology)
  35. (.setup-heartbeats! storm-cluster-state storm-id)
  36. (let [thrift-status->kw-status {TopologyInitialStatus/INACTIVE :inactive
  37. TopologyInitialStatus/ACTIVE :active}]
  38. (start-storm nimbus storm-name storm-id (thrift-status->kw-status (.get_initial_status submitOptions))))
  39. (mk-assignments nimbus)))
  40. (catch Throwable e
  41. (log-warn-error e "Topology submission exception. (topology name='" storm-name "')")
  42. (throw e))))
复制代码




storm-name表示topology的名字,uploadedJarLocation表示jar包在nimbus上的位 置,serializedConf表示topology的序列化的配置信息,topology参数表示thrift结构的 topology,topology结构定义在storm.thrift中,如下:



  1. struct StormTopology {
  2. //ids must be unique across maps
  3. // #workers to use is in conf
  4. 1: required map<string, SpoutSpec> spouts;
  5. 2: required map<string, Bolt> bolts;
  6. 3: required map<string, StateSpoutSpec> state_spouts;
  7. }
复制代码




spouts存放spout id和spout的键值对,bolts存放bolt id和bolt的键值对,StateSpoutSpec暂未实现。SpoutSpec定义如下:


  1. struct SpoutSpec {
  2. 1: required ComponentObject spout_object;
  3. 2: required ComponentCommon common;
  4. // can force a spout to be non-distributed by overriding the component configuration
  5. // and setting TOPOLOGY_MAX_TASK_PARALLELISM to 1
  6. }
复制代码




Bolt定义如下:

  1. struct Bolt {
  2. 1: required ComponentObject bolt_object;
  3. 2: required ComponentCommon common;
  4. }
复制代码





Bolt和Spout的结构相同,都是由1个ComponentObject结构和1个ComponentCommon结构组成。ComponentObject定义如下:


  1. union ComponentObject {
  2. 1: binary serialized_java;
  3. 2: ShellComponent shell;
  4. 3: JavaObject java_object;
  5. }
复制代码




ComponentObject即是bolt的实现实体,它可以是以下三个类型之一:

1、1个序列化的java对象(这个对象实现IBolt接口)
2、1个ShellComponent对象,意味着bolt是由其他语言实现的。如果以这种方式来定义1个bolt,Storm将会实例化1个ShellBolt对象来
负责处理基于JVM的worker进程与非JVM的component(即该bolt)实现体之间的通讯。
3、 1个JavaObject结构,这个结构告诉Storm实例化这个bolt所需要的classname和构造函数参数。这一点在你想用非JVM语言来定义 topology时比较有用。这样,在你使用非JVM语言来定义topology时就可以做到既使用基于     JVM的spout或bolt,同时又不需要创建并序列化它们的Java对象。

ComponentCommon定义如下:


  1. struct ComponentCommon {
  2. 1: required map<GlobalStreamId, Grouping> inputs;
  3. 2: required map<string, StreamInfo> streams; //key is stream id
  4. 3: optional i32 parallelism_hint; //how many threads across the cluster should be dedicated to this component
  5. // component specific configuration respects:
  6. // topology.debug: false
  7. // topology.max.task.parallelism: null // can replace isDistributed with this
  8. // topology.max.spout.pending: null
  9. // topology.kryo.register // this is the only additive one
  10. // component specific configuration
  11. 4: optional string json_conf;
  12. }
复制代码




GlobalStreamId定义如下:


  1. struct GlobalStreamId {
  2. 1: required string componentId;
  3. 2: required string streamId;
  4. #Going to need to add an enum for the stream type (NORMAL or FAILURE)
  5. }
复制代码




ComponentCommon定义了这个component的其他所有属性。包括:

1、这个component接收什么stream(被定义在1个component_id到stream_id的map里,在stream做分组时用到)
2、这个component发射什么stream以及stream的元数据(是否是direct stream,stream中field的声明)
3、这个component的并行度
4、这个component的配置项configuration

  1. (assert (not-nil? submitOptions))如果submitOptions为nil,那么assert将会抛出java.lang.AssertionError, (validate-topology-name! storm-name)验证topology的名字,validate-topology-name!定义如下:
  2. (defn validate-topology-name! [name]
  3. (if (some #(.contains name %) DISALLOWED-TOPOLOGY-NAME-STRS)
  4. (throw (InvalidTopologyException.
  5. (str "Topology name cannot contain any of the following: " (pr-str DISALLOWED-TOPOLOGY-NAME-STRS))))
  6. (if (clojure.string/blank? name)
  7. (throw (InvalidTopologyException.
  8. ("Topology name cannot be blank"))))))
复制代码



DISALLOWED-TOPOLOGY-NAME-STRS定义如下:

(def DISALLOWED-TOPOLOGY-NAME-STRS #{“/” “.” “:” “\\”})
包含了不允许出现在topology名字中的特殊字符,some函数的第一个参数是一个匿名函数,对DISALLOWED-TOPOLOGY- NAME-STRS集合中的每个元素应用该匿名函数,遇到第一个true则返回true。validate-topology-name!函数主要检查 topology的名字中是否包含”非法字符”。check-storm-active!函数用于检查该topology的状态是否是”active”。 定义如下:


  1. (defn check-storm-active! [nimbus storm-name active?]
  2. (if (= (not active?)
  3. (storm-active? (:storm-cluster-state nimbus)
  4. storm-name))
  5. (if active?
  6. (throw (NotAliveException. (str storm-name " is not alive")))
  7. (throw (AlreadyAliveException. (str storm-name " is already active"))))
  8. ))
复制代码



nimbus是一个保存了nimbus thrift server当前状态的map,这个map是由nimbus-data函数生成的,nimbus-data函数如下:
  1. (defn nimbus-data [conf inimbus]
  2. (let [forced-scheduler (.getForcedScheduler inimbus)]
  3. {:conf conf
  4. :inimbus inimbus
  5. :submitted-count (atom 0)
  6. :storm-cluster-state (cluster/mk-storm-cluster-state conf)
  7. :submit-lock (Object.)
  8. :heartbeats-cache (atom {})
  9. :downloaders (file-cache-map conf)
  10. :uploaders (file-cache-map conf)
  11. :uptime (uptime-computer)
  12. :validator (new-instance (conf NIMBUS-TOPOLOGY-VALIDATOR))
  13. :timer (mk-timer :kill-fn (fn [t]
  14. (log-error t "Error when processing event")
  15. (exit-process! 20 "Error when processing an event")
  16. ))
  17. :scheduler (mk-scheduler conf inimbus)
  18. }))
复制代码



conf保存了storm集群的配置信息,inimbus表示当前nimbus实例,cluster/mk-storm-cluster-state返回一个实现了StormClusterState协议的实例。storm-active?函数定义如下:

(defn storm-active? [storm-cluster-state storm-name]
(not-nil? (get-storm-id storm-cluster-state storm-name)))
通过调用get-storm-id函数获取指定topology名字的topology id,如果id存在则返回true,否则返回false。get-storm-id函数如下:

(defn get-storm-id [storm-cluster-state storm-name]
(let [active-storms (.active-storms storm-cluster-state)]
(find-first
#(= storm-name (:storm-name (.storm-base storm-cluster-state % nil)))
active-storms)
))
active-storms函数获取zookeeper中/storms/的所有children,/storms/{topology-id}中存放当前正在运行的topology信息。保存的内容参考common.clj中的类StormBase。

(defrecord StormBase [storm-name launch-time-secs status num-workers component->executors])
find-first函数返回名字等于storm-name的第一个topology的id。当我们正确提交topology时,由于 zookeeper中的/storms中不存在与之对应的{topology-id}文件,所以check-storm-active!函数的第一个if 的条件表达式为(= true true)。进而通过check-storm-active!函数的检查。将topology的配置信息绑定到topo-conf,validate- configs-with-schemas函数验证配置信息的正确性,validate-configs-with-schemas定义如下:


  1. (defn validate-configs-with-schemas
  2. [conf]
  3. (doseq [[k v] conf
  4. :let [schema (CONFIG-SCHEMA-MAP k)]]
  5. (if (not (nil? schema))
  6. (.validateField schema k v))))
  7. CONFIG-SCHEMA-MAP定义如下:
  8. ;; Create a mapping of config-string -> validator
  9. ;; Config fields must have a _SCHEMA field defined
  10. (def CONFIG-SCHEMA-MAP
  11. (->> (.getFields Config)
  12. (filter #(not (re-matches #".*_SCHEMA$" (.getName %))))
  13. (map (fn [f] [(.get f nil)
  14. (get-FieldValidator
  15. (-> Config
  16. (.getField (str (.getName f) "_SCHEMA"))
  17. (.get nil)))]))
  18. (into {})))
复制代码



Config.java中主要有两类静态变量:一类是配置信息,一类是配置信息对应的校验器,校验器属性以_SCHEMA结尾。CONFIG-SCHEMA-MAP中存放了配置信息变量名和对应校验器的键值对config-string -> validator。
validate- configs-with-schemas函数就是根据配置信息名获取对应校验器,然后对配置信息值进行校验。相关校验器请查看 ConfigValidation类的内部类FieldValidator。(:validator nimbus)返回一个实现了backtype.storm.nimbus.ITopologyValidator接口的实例 (backtype.storm.nimbus.DefaultTopologyValidators实例)并调用其validate方法。 backtype.storm.nimbus.DefaultTopologyValidators类如下:


  1. public class DefaultTopologyValidator implements ITopologyValidator {
  2. @Override
  3. public void prepare(Map StormConf){
  4. }
  5. @Override
  6. public void validate(String topologyName, Map topologyConf, StormTopology topology) throws InvalidTopologyException {
  7. }
  8. }
复制代码



默认情况下validate方法是一个空实现。
swap!函数用于将atom(原子类型,与java中的原子类型相同)类型的 (:submitted-count nimbus)加1,保存已提交topology的个数。storm-id绑定了topology的id。storm-conf绑定topology配置 信息和集群配置信息合并后序列化器、需要序列化的类、acker的个数和最大任务并行度配置信息。total-storm-conf绑定全部配置信息。 normalize-topology函数主要功能就是为topology添加”topology.tasks”(task总数)配置信息。

normalize-topology定义如下:

  1. (defn normalize-topology [storm-conf ^StormTopology topology]
  2. (let [ret (.deepCopy topology)]
  3. (doseq [[_ component] (all-components ret)]
  4. (.set_json_conf
  5. (.get_common component)
  6. (->> {TOPOLOGY-TASKS (component-parallelism storm-conf component)}
  7. (merge (component-conf component))
  8. to-json )))
  9. ret ))
复制代码




ret绑定一个topology的深度复制,all-components函数返回该topology的所有组件的id和spout/bolt对象 的键值对,然后通过调用get_common方法获取spot/bolt对象的ComponentCommon属性,->>是clojure 中的一个宏,作用就是将{……}作为merge函数的最后一个参数,然后将merge函数的返回值作为to-json函数的最后一个参 数,component-parallelism函数定义如下:

(defn- component-parallelism [storm-conf component]
(let [storm-conf (merge storm-conf (component-conf component))
num-tasks (or (storm-conf TOPOLOGY-TASKS) (num-start-executors component))
max-parallelism (storm-conf TOPOLOGY-MAX-TASK-PARALLELISM)
]
(if max-parallelism
(min max-parallelism num-tasks)
num-tasks)))
component-parallelism是个私有函数,主要功能就是确定”topology.tasks”的值,num-start- executors函数获取spout/bolt的并行度,没有设置并行度时默认值为1,num-tasks绑定该topology的任务数,max- parallelism绑定最大任务数,最后num-tasks和max-parallelism中较小的。normalize-topology函数会 将添加了”topology.tasks”的配置信息保存到spout/bolt的ComponentCommon属性的json_conf中,并返回修 改后的topology。
system-topology!函数定义如下:

(defn system-topology! [storm-conf ^StormTopology topology]
(validate-basic! topology)
(let [ret (.deepCopy topology)]
(add-acker! storm-conf ret)
(add-metric-components! storm-conf ret)
(add-system-components! storm-conf ret)
(add-metric-streams! ret)
(add-system-streams! ret)
(validate-structure! ret)
ret
))
validate-basic!验证topology的基本信息,add-acker!添加acker bolt,add-acker!函数定义如下:

  1. (defn add-acker! [storm-conf ^StormTopology ret]
  2. (let [num-executors (if (nil? (storm-conf TOPOLOGY-ACKER-EXECUTORS)) (storm-conf TOPOLOGY-WORKERS) (storm-conf TOPOLOGY-ACKER-EXECUTORS))
  3. acker-bolt (thrift/mk-bolt-spec* (acker-inputs ret)
  4. (new backtype.storm.daemon.acker)
  5. {ACKER-ACK-STREAM-ID (thrift/direct-output-fields ["id"])
  6. ACKER-FAIL-STREAM-ID (thrift/direct-output-fields ["id"])
  7. }
  8. :p num-executors
  9. :conf {TOPOLOGY-TASKS num-executors
  10. TOPOLOGY-TICK-TUPLE-FREQ-SECS (storm-conf TOPOLOGY-MESSAGE-TIMEOUT-SECS)})]
  11. (dofor [[_ bolt] (.get_bolts ret)
  12. :let [common (.get_common bolt)]]
  13. (do
  14. (.put_to_streams common ACKER-ACK-STREAM-ID (thrift/output-fields ["id" "ack-val"]))
  15. (.put_to_streams common ACKER-FAIL-STREAM-ID (thrift/output-fields ["id"]))
  16. ))
  17. (dofor [[_ spout] (.get_spouts ret)
  18. :let [common (.get_common spout)
  19. spout-conf (merge
  20. (component-conf spout)
  21. {TOPOLOGY-TICK-TUPLE-FREQ-SECS (storm-conf TOPOLOGY-MESSAGE-TIMEOUT-SECS)})]]
  22. (do
  23. ;; this set up tick tuples to cause timeouts to be triggered
  24. (.set_json_conf common (to-json spout-conf))
  25. (.put_to_streams common ACKER-INIT-STREAM-ID (thrift/output-fields ["id" "init-val" "spout-task"]))
  26. (.put_to_inputs common
  27. (GlobalStreamId. ACKER-COMPONENT-ID ACKER-ACK-STREAM-ID)
  28. (thrift/mk-direct-grouping))
  29. (.put_to_inputs common
  30. (GlobalStreamId. ACKER-COMPONENT-ID ACKER-FAIL-STREAM-ID)
  31. (thrift/mk-direct-grouping))
  32. ))
  33. (.put_to_bolts ret "__acker" acker-bolt)
  34. ))
复制代码



根据是否配置”topology.acker.executors”获取acker线程的个数,如果没有配置num-executors绑 定”topology.workers”的值,否则绑定”topology.acker.executors”的值。acker-bolt绑定生成的 acker bolt对象。acker-inputs函数定义如下:

(defn acker-inputs [^StormTopology topology]
(let [bolt-ids (.. topology get_bolts keySet)
spout-ids (.. topology get_spouts keySet)
spout-inputs (apply merge
(for [id spout-ids]
{[id ACKER-INIT-STREAM-ID] ["id"]}
))
bolt-inputs (apply merge
(for [id bolt-ids]
{[id ACKER-ACK-STREAM-ID] ["id"]
[id ACKER-FAIL-STREAM-ID] ["id"]}
))]
(merge spout-inputs bolt-inputs)))
bolt-ids绑定topology所有bolt的id,spout-ids绑定所有spout的id,spout-inputs绑定来自 spout的输入流,bolt-inputs绑定来自bolt的输入流,最后返回合并后的输入流(一个map对象)。ACKER-ACK-STREAM- ID和ACKER-FAIL-STREAM-ID表示acker的输出流。TOPOLOGY-TICK-TUPLE-FREQ-SECS表示tick tuple的频率,初始值为消息超时的时间。第一个dofor语句为每个bolt添加ACKER-ACK-STREAM-ID和ACKER-FAIL- STREAM-ID输出流用于将ack value发送个acker bolt,第二个dofor为每个spout设置了tick tuple的发送频率,并且设置了发送给acker bolt的ACKER-INIT-STREAM-ID输出流和来自ackerblot的两个输入流。这样acker bolt就可以与spout和bolt进行ack信息通信了。add-metric-components!函数主要功能就是将metric bolts添加到topology定义中。metric bolt主要用于统计线程executor相关的信息。add-metric-components!函数定义如下:


  1. (defn add-metric-components! [storm-conf ^StormTopology topology]
  2. (doseq [[comp-id bolt-spec] (metrics-consumer-bolt-specs storm-conf topology)]
  3. (.put_to_bolts topology comp-id bolt-spec)))
  4. metrics-consumer-bolt-specs函数定义如下:
  5. (defn metrics-consumer-bolt-specs [storm-conf topology]
  6. (let [component-ids-that-emit-metrics (cons SYSTEM-COMPONENT-ID (keys (all-components topology)))
  7. inputs (->> (for [comp-id component-ids-that-emit-metrics]
  8. {[comp-id METRICS-STREAM-ID] :shuffle})
  9. (into {}))
  10. mk-bolt-spec (fn [class arg p]
  11. (thrift/mk-bolt-spec*
  12. inputs
  13. (backtype.storm.metric.MetricsConsumerBolt. class arg)
  14. {} :p p :conf {TOPOLOGY-TASKS p}))]
  15. (map
  16. (fn [component-id register]
  17. [component-id (mk-bolt-spec (get register "class")
  18. (get register "argument")
  19. (or (get register "parallelism.hint") 1))])
  20. (metrics-consumer-register-ids storm-conf)
  21. (get storm-conf TOPOLOGY-METRICS-CONSUMER-REGISTER))))
复制代码


component-ids-that-emit-metrics绑定包括system bolt在内的所有spout和bolt的id,inputs绑定了metric bolt的输入流,并且使用shuffle grouping。mk-bolt-spec绑定一个匿名函数,metrics-consumer-register-ids函数为每个metric consumer对象产生一个component id列表,get函数返回所有metric consumer对象,map函数返回component id和metric consumer对象集合的列表([component-id metric-consumer] [component-id metric-consumer]……)。add-system-components!函数主要功能是将system bolt添加到topology定义中。system bolt用于统计与进程worker相关的信息,如内存使用率,gc情况,网络吞吐量等。每个进程worker中只有一个system bolt。add-system-components!函数定义如下:


  1. (defn add-system-components! [conf ^StormTopology topology]
  2. (let [system-bolt-spec (thrift/mk-bolt-spec*
  3. {}
  4. (SystemBolt.)
  5. {SYSTEM-TICK-STREAM-ID (thrift/output-fields ["rate_secs"])
  6. METRICS-TICK-STREAM-ID (thrift/output-fields ["interval"])}
  7. :p 0
  8. :conf {TOPOLOGY-TASKS 0})]
  9. (.put_to_bolts topology SYSTEM-COMPONENT-ID system-bolt-spec)))
复制代码




从thrift/mk-bolt-spec*函数的第一个参数{}我们可以发现system bolt没有输入流,从第三个参数可以发现它有两个输出流用于发送tick tuple,它的并行度为0,因为system bolt是与进程worker相关的,所以没有必要指定并行度。同时他也不需要执行任何task。add-metric-streams!函数主要功能用 于给topology添加metric streams定义,add-metric-streams!定义如下:


  1. (defn add-metric-streams! [^StormTopology topology]
  2. (doseq [[_ component] (all-components topology)
  3. :let [common (.get_common component)]]
  4. (.put_to_streams common METRICS-STREAM-ID
  5. (thrift/output-fields ["task-info" "data-points"]))))
复制代码




给spout和bolt添加METRICS-STREAM-ID标示的metric stream。add-system-streams!函数与add-metric-streams!相似,给spout和bolt添加SYSTEM- STREAM-ID标示的system stream。submitTopologyWithOpts函数在调用system-topology!函数后,首先加锁,然后调用setup- storm-code函数,该函数的主要功能就是将上传给nimbus的jar包、topology和配置信息拷贝到 {storm.local.dir}/nimbus/stormdist/{topology id}目录中,定义如下:


  1. (defn- setup-storm-code [conf storm-id tmp-jar-location storm-conf topology]
  2. (let [stormroot (master-stormdist-root conf storm-id)]
  3. (FileUtils/forceMkdir (File. stormroot))
  4. (FileUtils/cleanDirectory (File. stormroot))
  5. (setup-jar conf tmp-jar-location stormroot)
  6. (FileUtils/writeByteArrayToFile (File. (master-stormcode-path stormroot)) (Utils/serialize topology))
  7. (FileUtils/writeByteArrayToFile (File. (master-stormconf-path stormroot)) (Utils/serialize storm-conf))
  8. ))
复制代码




setup-jar函数将{storm.local.dir}/nimbus/inbox/中的jar包拷贝到{storm.local.dir} /nimbus/stormdist/{topology id}目录,并重命名为stormjar.jar。FileUtils/writeByteArrayToFile将topology对象和storm- conf序列化后分别保存到stormcode.ser和stormconf.ser。setup-heartbeats!函数定义在 cluster.clj文件中,是StormClusterState协议的一个函数,主要功能就是在zookeeper上创建该topology用于存 放心跳信息的目录。心跳目录:
/storm/workerbeats/{topology id}/。
start-storm函数的主要 功能读取整个集群的配置信息、nimbus的配置信息、从stormconf.ser反序列化topology配置信息和从stormcode.ser反 序列化出topology,然后通过调用activate-storm!函数将topology的元数据StormBase对象写入zookeeper的 /storm/storms/{topology id}文件中。定义如下:


  1. (defn- start-storm [nimbus storm-name storm-id topology-initial-status]
  2. {:pre [(#{:active :inactive} topology-initial-status)]}
  3. (let [storm-cluster-state (:storm-cluster-state nimbus)
  4. conf (:conf nimbus)
  5. storm-conf (read-storm-conf conf storm-id)
  6. topology (system-topology! storm-conf (read-storm-topology conf storm-id))
  7. num-executors (->> (all-components topology) (map-val num-start-executors))]
  8. (log-message "Activating " storm-name ": " storm-id)
  9. (.activate-storm! storm-cluster-state
  10. storm-id
  11. (StormBase. storm-name
  12. (current-time-secs)
  13. {:type topology-initial-status}
  14. (storm-conf TOPOLOGY-WORKERS)
  15. num-executors))))
复制代码




submitTopologyWithOpts函数最后调用mk-assignments函数进行任务分配。任务分配是stom架构的重要组成部分。鉴于篇幅问题,有关任务分配的源码分析会在之后的文章中讲解。






已有(3)人评论

跳转到指定楼层
feng01301218 发表于 2015-4-5 14:20:54
问题导读

1.topology的jar包是 如何上传到nimbus上的?
2.storm的jar命令是由什么语言实现的?
3.参数jarfile表示什么?
4.topology的入口是什么?
5.main方法构建topology后,调用StormSubmitter类的哪个方法提交topology?
6.ComponentCommon定义了这个component的属性都包括什么?
7.submitTopology方法主要完成哪三件工作?
回复

使用道具 举报

ainubis 发表于 2015-4-5 18:33:35
回复

使用道具 举报

hapjin 发表于 2015-6-23 14:58:52
受益了。感谢。
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条