导读:
本文只要明白下面问题即可
如何集成MetaQ到项目代码中?
1 文档说明
该文档为storm模拟项目系列文档之一,是MetaQ与storm接口的说明文档,主要介绍了如何集成MetaQ到项目代码中。
关于MetaQ生产者及消费者实例,参考自官方生产者与消费者实例:
https://github.com/killme2008/Metamorphosis/tree/master/metamorphosis-storm-spout
关于MetaQ与Storm之间的spout接口参考自官方MetaQ与storm的接口:
https://github.com/killme2008/Metamorphosis/tree/master/metamorphosis-storm-spout
MetaQ作为一种开源的消息中间件,有完善的开源社区,并且版本更新稳定,作为国人的开源软件(阿里),其对应的许多技术文档还是比较容易看的,并且Github提供了许多的应用实例,所以使用MetaQ作为Storm的消息源之一是必须掌握的。
该模拟项目中,使用MetaQ作为storm的消息源,MetaqSpout从指定的zkconnect及topic中读取数据,发布到节点中。此外,还写了MetaQ与storm的生产者接口,即MetaqBolt指定Topic将数据写入Metaq中供其他业务系统继续使用。
好了文档说明就这些了,代码马上会随文档更新(通常情况下,代码调试成功了我才会写文档的)。
//源代码见首页“代买GIT”
2 MetaQ与Storm接口
2.1 MetaqSpout
2.1.1 接口说明
该接口参考自Github,作了部分修改。项目设计中,使用storm.xml.MetaqSpoutXml读取MetaqSpout对应的配置文件MetaqSpout.xml
配置文件中,指明zkconnect的地址及端口号、metaq的root目录、对应的消费topic及其消费组(这个很重要)。
读取配置之后,将配置传递到spout的open部分进行初始化工作,主要是进行消费者参数设定(包括zkconnect、root目录、Topic及Group设置)等。
在nextTuple方法中,进行消息(message)拉取(poll),每次拉取一条记录,发布到下一个拓扑节点中。
2.1.2 上代码贴部分主要代码(详细参考代码包):
- //构造函数,传递xml地址
- public MetaqSpout(String MetaqSpoutXml) {
- super();
- this.metaqspoutxml = MetaqSpoutXml;
- }
- //实例化参数配置类
- private ZKConfig zkConfig = new ZKConfig();
- private MetaClientConfig metaClientConfig = new MetaClientConfig();
- private final Scheme scheme = new StringScheme();
- //初始化调用
- public void open(final Map conf, final TopologyContext context, final SpoutOutputCollector collector) {
- //从xml中获取参数
- new MetaqSpoutXml(this.metaqspoutxml).read();
- this.zkConfig.zkConnect = MetaqSpoutXml.zkConnect;//"192.168.2.240:2181";
- this.zkConfig.zkRoot = MetaqSpoutXml.zkRoot;//"/meta";
- String topic = MetaqSpoutXml.topic;
- String group = MetaqSpoutXml.group;
-
- this.metaClientConfig.setZkConfig(this.zkConfig);
- this.consumerConfig = new ConsumerConfig(group);
-
- //final String topic = (String) conf.get(TOPIC);
- if (topic == null) {
- throw new IllegalArgumentException(TOPIC + " is null");
- }
- Integer maxSize = (Integer) conf.get(FETCH_MAX_SIZE);
- if (maxSize == null) {
- log.warn("Using default FETCH_MAX_SIZE");
- maxSize = DEFAULT_MAX_SIZE;
- }
- this.id2wrapperMap = new ConcurrentHashMap();
- this.messageQueue = new LinkedTransferQueue();
- try {
- this.collector = collector;
- this.setUpMeta(topic, maxSize);
- }
- catch (final MetaClientException e) {
- log.error("Setup meta consumer failed", e);
- }
- }
- private void setUpMeta(final String topic, final Integer maxSize) throws MetaClientException {
- this.sessionFactory = new MetaMessageSessionFactory(this.metaClientConfig);
- this.messageConsumer = this.sessionFactory.createConsumer(this.consumerConfig);
- this.messageConsumer.subscribe(topic, maxSize, new MessageListener() {
- public void recieveMessages(final Message message) {
- final MetaMessageWrapper wrapper = new MetaMessageWrapper(message);
- MetaqSpout.this.id2wrapperMap.put(message.getId(), wrapper);
- MetaqSpout.this.messageQueue.offer(wrapper);
- try {
- wrapper.latch.await();
- }
- catch (final InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- // 消费失败,抛出运行时异常
- if (!wrapper.success) {
- throw new RuntimeException("Consume message failed");
- }
- }
- public Executor getExecutor() {
- return null;
- }
- }).completeSubscribe();
- }
- //关闭时调用,进行consumer的shutdown操作
- public void close() {
- try {
- this.messageConsumer.shutdown();
- }
- catch (final MetaClientException e) {
- log.error("Shutdown consumer failed", e);
- }
- try {
- this.sessionFactory.shutdown();
- }
- catch (final MetaClientException e) {
- log.error("Shutdown session factory failed", e);
- }
- }
- //消息发布
- public void nextTuple() {
- if (this.messageConsumer != null) {
- try {
- //进行消息拉取
- final MetaMessageWrapper wrapper = this.messageQueue.poll(WAIT_FOR_NEXT_MESSAGE, TimeUnit.MILLISECONDS);
- if (wrapper == null) {
- return;
- }
- final Message message = wrapper.message;
- this.collector.emit(this.scheme.deserialize(message.getData()), message.getId());
- }
- catch (final InterruptedException e) {
- e.printStackTrace();
- }
-
- }
- try {
- Thread.sleep(100);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- //消息操作成功确认机制
- public void ack(final Object msgId) {
- if (msgId instanceof Long) {
- final long id = (Long) msgId;
- final MetaMessageWrapper wrapper = this.id2wrapperMap.remove(id);
- if (wrapper == null) {
- log.warn(String.format("don't know how to ack(%s: %s)", msgId.getClass().getName(), msgId));
- return;
- }
- wrapper.success = true;
- wrapper.latch.countDown();
- }
- else {
- log.warn(String.format("don't know how to ack(%s: %s)", msgId.getClass().getName(), msgId));
- }
- }
- //消费失败时返回
- public void fail(final Object msgId) {
- if (msgId instanceof Long) {
- final long id = (Long) msgId;
- final MetaMessageWrapper wrapper = this.id2wrapperMap.remove(id);
- if (wrapper == null) {
- log.warn(String.format("don't know how to reject(%s: %s)", msgId.getClass().getName(), msgId));
- return;
- }
- wrapper.success = false;
- wrapper.latch.countDown();
- }
- else {
- log.warn(String.format("don't know how to reject(%s: %s)", msgId.getClass().getName(), msgId));
- }
- }
复制代码
2.2 MetaqBolt
2.2.1 接口说明
该部分代码修改自Github上的Metaq异步生产者实例。设计这个Bolt的原因是,部分业务有这种需求,当经过storm实时处理后,数据发送到下一个业务系统,当下一个业务系统也是从metaq拉取数据时,就需要我们把处理过的数据写入到metaq中去,所以有了这个接口。
其读取配置文件的过程与MetaqSpout相似,但是没有组(Group)的概念,只需指定地址、目录及Topic(前提是Metaq上有该Topic),则可以把数据写入metaq中。
2.2.1 上代码该部分代码较简单
可以参考AsyncConsumer代码。
- //构造,传递配置路径
- public MetaqBolt(String MetaqSpoutXml) {
- super();
- this.metaqspoutxml = MetaqSpoutXml;
- }
- //初始化操作
- public void prepare(Map stormConf, TopologyContext context,
- OutputCollector collector) {
- System.out.println("MetaqBolt -- Start!");
- this.collector = collector;
- // 初始化metaq的一些设置,包括zk链接地址,根目录等
- this.zkConfig.zkConnect = MetaqSpoutXml.zkConnect;// "192.168.2.240:2181";
- this.zkConfig.zkRoot = MetaqSpoutXml.zkRoot;// "/meta";
- this.topic = MetaqSpoutXml.topic;
- this.metaClientConfig.setZkConfig(this.zkConfig);
- try {
- this.sessionFactory = new MetaMessageSessionFactory(
- this.metaClientConfig);
- } catch (MetaClientException e) {
- e.printStackTrace();
- }
- this.producer = this.sessionFactory.createProducer();
- this.producer.publish(this.topic);// 发布topic
- }
- public void execute(Tuple input) {
- String str = input.getString(0);
- try {
- this.sendResult = producer.sendMessage(new Message(this.topic, str
- .getBytes()));
- } catch (MetaClientException | InterruptedException e) {
- e.printStackTrace();
- }
- //当生产失败时打印失败数据
- if (!this.sendResult.isSuccess()) {
- System.err.println("Send message failed,error message:"
- + sendResult.getErrorMessage());
- }
- }
复制代码
3 代码改动说明
关于此次代码变动较大,加了一个spout源的接口,一个bolt的数据落地接口,对topology进行了优化。
具体如下:
(1) 增加了MetaqSpout接口,实现从MetaQ中读取数据(重点)
(2) 增加了MetaqBolt接口,实现新的数据落地接口,将数据写入MetaQ中
(3) 修改了Topology主类,实现了节点可配置,通过配置文件列表,即不同类型的spout及bolt可动态搭配,想要实现不同拓扑功能,不用修改代码,而只需修改配置即可(重点)
PS:详细见新更新的代码包,其内又最新的源码,需要的加群,从群共享中获取,或者博客留言。
//最新代码见首页"代码GIT"
4 关于Metaq的报错
之前调试代码时遇到一个错误,纠结了很久,后面还是群里的一个朋友指点,才知道了错在哪里,所以把这个错误记载下来。
4.1 报错
PS:根据错误提示,总是没找到其原因。
4.2 解决在群里朋友的指点下,查看了metaq的启动日志。
才发现metaq往zk注册的服务器ip是192.168.122.1不是我本机的ip,之前对metaq进行配置的时候,并没有进行hostName配置,因为metaq据说默认的注册ip是localhost所以就没有注意了,但是好像这种情况来看,他进行zk注册的时候使用的是其代码内部的预留ip进行注册。
我在metaq的server.ini中进行了hostName配置,这个问题就解决了。
~~o(>_泪奔啊,他的错误提示太不科学了。。。。
相关文档
Storm项目:流数据监控一流数据监控设计文档
http://www.aboutyun.com/thread-10042-1-1.html
Storm项目:流数据监控二:流数据监控代码详解
http://www.aboutyun.com/thread-10047-1-1.html
Storm项目:流数据监控三:流数据监控示例运行
http://www.aboutyun.com/thread-10046-1-1.html
Storm项目:流数据监控五Zookeeper统一配置
http://www.aboutyun.com/thread-10044-1-1.html
实时处理方案架构 - Storm实时处理
http://www.aboutyun.com/thread-10043-1-1.html
本文地址:http://www.blogchong.com/post/storm_monitor_metaq_api.html
|