分享

Storm项目:流数据监控四---流数据监控MetaQ接口

tntzbzc 发表于 2014-11-16 14:45:12 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 1 15922

导读:
本文只要明白下面问题即可
如何集成MetaQ到项目代码中?







1 文档说明
该文档为storm模拟项目系列文档之一,是MetaQ与storm接口的说明文档,主要介绍了如何集成MetaQ到项目代码中。

关于MetaQ生产者及消费者实例,参考自官方生产者与消费者实例:
https://github.com/killme2008/Metamorphosis/tree/master/metamorphosis-storm-spout
关于MetaQ与Storm之间的spout接口参考自官方MetaQ与storm的接口:
https://github.com/killme2008/Metamorphosis/tree/master/metamorphosis-storm-spout

MetaQ作为一种开源的消息中间件,有完善的开源社区,并且版本更新稳定,作为国人的开源软件(阿里),其对应的许多技术文档还是比较容易看的,并且Github提供了许多的应用实例,所以使用MetaQ作为Storm的消息源之一是必须掌握的。

该模拟项目中,使用MetaQ作为storm的消息源,MetaqSpout从指定的zkconnect及topic中读取数据,发布到节点中。此外,还写了MetaQ与storm的生产者接口,即MetaqBolt指定Topic将数据写入Metaq中供其他业务系统继续使用。

好了文档说明就这些了,代码马上会随文档更新(通常情况下,代码调试成功了我才会写文档的)。
//源代码见首页“代买GIT”

2 MetaQ与Storm接口
2.1 MetaqSpout

2.1.1 接口说明

该接口参考自Github,作了部分修改。项目设计中,使用storm.xml.MetaqSpoutXml读取MetaqSpout对应的配置文件MetaqSpout.xml


配置文件中,指明zkconnect的地址及端口号、metaq的root目录、对应的消费topic及其消费组(这个很重要)。
读取配置之后,将配置传递到spout的open部分进行初始化工作,主要是进行消费者参数设定(包括zkconnect、root目录、Topic及Group设置)等。
在nextTuple方法中,进行消息(message)拉取(poll),每次拉取一条记录,发布到下一个拓扑节点中。

2.1.2 上代码贴部分主要代码(详细参考代码包):
  1.     //构造函数,传递xml地址
  2.     public MetaqSpout(String MetaqSpoutXml) {
  3.         super();
  4.         this.metaqspoutxml = MetaqSpoutXml;
  5.     }
  6.     //实例化参数配置类
  7.     private ZKConfig zkConfig = new ZKConfig();
  8.     private MetaClientConfig metaClientConfig = new MetaClientConfig();
  9.     private final Scheme scheme = new StringScheme();
  10.     //初始化调用
  11.     public void open(final Map conf, final TopologyContext context, final SpoutOutputCollector collector) {
  12.      //从xml中获取参数
  13.         new MetaqSpoutXml(this.metaqspoutxml).read();
  14.      this.zkConfig.zkConnect = MetaqSpoutXml.zkConnect;//"192.168.2.240:2181";
  15.         this.zkConfig.zkRoot = MetaqSpoutXml.zkRoot;//"/meta";
  16.         String topic = MetaqSpoutXml.topic;
  17.         String group = MetaqSpoutXml.group;
  18.         
  19.         this.metaClientConfig.setZkConfig(this.zkConfig);
  20.         this.consumerConfig = new ConsumerConfig(group);
  21.         
  22.         //final String topic = (String) conf.get(TOPIC);
  23.         if (topic == null) {
  24.             throw new IllegalArgumentException(TOPIC + " is null");
  25.         }
  26.         Integer maxSize = (Integer) conf.get(FETCH_MAX_SIZE);
  27.         if (maxSize == null) {
  28.             log.warn("Using default FETCH_MAX_SIZE");
  29.             maxSize = DEFAULT_MAX_SIZE;
  30.         }
  31.         this.id2wrapperMap = new ConcurrentHashMap();
  32.         this.messageQueue = new LinkedTransferQueue();
  33.         try {
  34.             this.collector = collector;
  35.             this.setUpMeta(topic, maxSize);
  36.         }
  37.         catch (final MetaClientException e) {
  38.             log.error("Setup meta consumer failed", e);
  39.         }
  40.     }
  41.     private void setUpMeta(final String topic, final Integer maxSize) throws MetaClientException {
  42.         this.sessionFactory = new MetaMessageSessionFactory(this.metaClientConfig);
  43.         this.messageConsumer = this.sessionFactory.createConsumer(this.consumerConfig);
  44.         this.messageConsumer.subscribe(topic, maxSize, new MessageListener() {
  45.             public void recieveMessages(final Message message) {
  46.                 final MetaMessageWrapper wrapper = new MetaMessageWrapper(message);
  47.                 MetaqSpout.this.id2wrapperMap.put(message.getId(), wrapper);
  48.                 MetaqSpout.this.messageQueue.offer(wrapper);
  49.                 try {
  50.                     wrapper.latch.await();
  51.                 }
  52.                 catch (final InterruptedException e) {
  53.                     Thread.currentThread().interrupt();
  54.                 }
  55.                 // 消费失败,抛出运行时异常
  56.                 if (!wrapper.success) {
  57.                     throw new RuntimeException("Consume message failed");
  58.                 }
  59.             }
  60.             public Executor getExecutor() {
  61.                 return null;
  62.             }
  63.         }).completeSubscribe();
  64.     }
  65.     //关闭时调用,进行consumer的shutdown操作
  66.     public void close() {
  67.         try {
  68.             this.messageConsumer.shutdown();
  69.         }
  70.         catch (final MetaClientException e) {
  71.             log.error("Shutdown consumer failed", e);
  72.         }
  73.         try {
  74.             this.sessionFactory.shutdown();
  75.         }
  76.         catch (final MetaClientException e) {
  77.             log.error("Shutdown session factory failed", e);
  78.         }
  79.     }
  80.     //消息发布
  81.     public void nextTuple() {
  82.         if (this.messageConsumer != null) {
  83.             try {
  84.               //进行消息拉取
  85.                 final MetaMessageWrapper wrapper = this.messageQueue.poll(WAIT_FOR_NEXT_MESSAGE, TimeUnit.MILLISECONDS);
  86.                 if (wrapper == null) {
  87.                     return;
  88.                 }
  89.                 final Message message = wrapper.message;
  90.                 this.collector.emit(this.scheme.deserialize(message.getData()), message.getId());
  91.             }
  92.             catch (final InterruptedException e) {
  93.               e.printStackTrace();
  94.             }
  95.             
  96.         }
  97.         try {
  98.             Thread.sleep(100);
  99.         } catch (InterruptedException e) {
  100.             e.printStackTrace();
  101.         }
  102.     }
  103.     //消息操作成功确认机制
  104.     public void ack(final Object msgId) {
  105.         if (msgId instanceof Long) {
  106.             final long id = (Long) msgId;
  107.             final MetaMessageWrapper wrapper = this.id2wrapperMap.remove(id);
  108.             if (wrapper == null) {
  109.                 log.warn(String.format("don't know how to ack(%s: %s)", msgId.getClass().getName(), msgId));
  110.                 return;
  111.             }
  112.             wrapper.success = true;
  113.             wrapper.latch.countDown();
  114.         }
  115.         else {
  116.             log.warn(String.format("don't know how to ack(%s: %s)", msgId.getClass().getName(), msgId));
  117.         }
  118.     }
  119.     //消费失败时返回
  120.     public void fail(final Object msgId) {
  121.         if (msgId instanceof Long) {
  122.             final long id = (Long) msgId;
  123.             final MetaMessageWrapper wrapper = this.id2wrapperMap.remove(id);
  124.             if (wrapper == null) {
  125.                 log.warn(String.format("don't know how to reject(%s: %s)", msgId.getClass().getName(), msgId));
  126.                 return;
  127.             }
  128.             wrapper.success = false;
  129.             wrapper.latch.countDown();
  130.         }
  131.         else {
  132.             log.warn(String.format("don't know how to reject(%s: %s)", msgId.getClass().getName(), msgId));
  133.         }
  134.     }
复制代码


2.2 MetaqBolt

2.2.1 接口说明

该部分代码修改自Github上的Metaq异步生产者实例。设计这个Bolt的原因是,部分业务有这种需求,当经过storm实时处理后,数据发送到下一个业务系统,当下一个业务系统也是从metaq拉取数据时,就需要我们把处理过的数据写入到metaq中去,所以有了这个接口。
其读取配置文件的过程与MetaqSpout相似,但是没有组(Group)的概念,只需指定地址、目录及Topic(前提是Metaq上有该Topic),则可以把数据写入metaq中。

2.2.1 上代码该部分代码较简单

可以参考AsyncConsumer代码。
  1.   //构造,传递配置路径
  2.     public MetaqBolt(String MetaqSpoutXml) {
  3.         super();
  4.         this.metaqspoutxml = MetaqSpoutXml;
  5.     }
  6.     //初始化操作
  7.     public void prepare(Map stormConf, TopologyContext context,
  8.             OutputCollector collector) {
  9.         System.out.println("MetaqBolt   --  Start!");
  10.         this.collector = collector;
  11.         // 初始化metaq的一些设置,包括zk链接地址,根目录等
  12.         this.zkConfig.zkConnect = MetaqSpoutXml.zkConnect;// "192.168.2.240:2181";
  13.         this.zkConfig.zkRoot = MetaqSpoutXml.zkRoot;// "/meta";
  14.         this.topic = MetaqSpoutXml.topic;
  15.         this.metaClientConfig.setZkConfig(this.zkConfig);
  16.         try {
  17.             this.sessionFactory = new MetaMessageSessionFactory(
  18.                     this.metaClientConfig);
  19.         } catch (MetaClientException e) {
  20.             e.printStackTrace();
  21.         }
  22.         this.producer = this.sessionFactory.createProducer();
  23.         this.producer.publish(this.topic);// 发布topic
  24.     }
  25.     public void execute(Tuple input) {
  26.         String str = input.getString(0);
  27.         try {
  28.             this.sendResult = producer.sendMessage(new Message(this.topic, str
  29.                     .getBytes()));
  30.         } catch (MetaClientException | InterruptedException e) {
  31.             e.printStackTrace();
  32.         }
  33.         //当生产失败时打印失败数据
  34.         if (!this.sendResult.isSuccess()) {
  35.             System.err.println("Send message failed,error message:"
  36.                     + sendResult.getErrorMessage());
  37.         }
  38.     }
复制代码



3 代码改动说明

关于此次代码变动较大,加了一个spout源的接口,一个bolt的数据落地接口,对topology进行了优化。
具体如下:
(1) 增加了MetaqSpout接口,实现从MetaQ中读取数据(重点
(2) 增加了MetaqBolt接口,实现新的数据落地接口,将数据写入MetaQ中
(3) 修改了Topology主类,实现了节点可配置,通过配置文件列表,即不同类型的spout及bolt可动态搭配,想要实现不同拓扑功能,不用修改代码,而只需修改配置即可(重点

PS:详细见新更新的代码包,其内又最新的源码,需要的加群,从群共享中获取,或者博客留言。
//最新代码见首页"代码GIT"




4 关于Metaq的报错

之前调试代码时遇到一个错误,纠结了很久,后面还是群里的一个朋友指点,才知道了错在哪里,所以把这个错误记载下来。

4.1 报错

PS:根据错误提示,总是没找到其原因。

4.2 解决在群里朋友的指点下,查看了metaq的启动日志。




才发现metaq往zk注册的服务器ip是192.168.122.1不是我本机的ip,之前对metaq进行配置的时候,并没有进行hostName配置,因为metaq据说默认的注册ip是localhost所以就没有注意了,但是好像这种情况来看,他进行zk注册的时候使用的是其代码内部的预留ip进行注册。
我在metaq的server.ini中进行了hostName配置,这个问题就解决了。
~~o(>_泪奔啊,他的错误提示太不科学了。。。。

相关文档

Storm项目:流数据监控一流数据监控设计文档
http://www.aboutyun.com/thread-10042-1-1.html


Storm项目:流数据监控二:流数据监控代码详解
http://www.aboutyun.com/thread-10047-1-1.html

Storm项目:流数据监控三:流数据监控示例运行
http://www.aboutyun.com/thread-10046-1-1.html



Storm项目:流数据监控五Zookeeper统一配置
http://www.aboutyun.com/thread-10044-1-1.html

实时处理方案架构 - Storm实时处理
http://www.aboutyun.com/thread-10043-1-1.html


本文地址:http://www.blogchong.com/post/storm_monitor_metaq_api.html

已有(1)人评论

跳转到指定楼层
漂泊一剑客 发表于 2015-10-20 21:47:29
有时间的时候,我常来刷个分
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条