分享

Storm ack和fail机制个人经验总结

howtodown 发表于 2014-10-15 09:30:21 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 3 53559
本文为个人经验性总结





首先开启storm tracker机制的前提是,

1. 在spout emit tuple的时候,要加上第3个参数messageid
2. 在配置中acker数目至少为1
3. 在bolt emit的时候,要加上第二个参数anchor tuple,以保持tracker链路



流程,

1. 当tuple具有messageid时,spout会把该tuple加到pending list里面
   并发消息给acker,通知acker开始tracker这条tuple

2. 然后再后续的bolt的处理逻辑中,你必须显式的ack或fail所有处理的tuple
   如果这条tuple在整个DAG图上都成功执行了,那么acker会发现该tuple的track异或值为0
   于是acker会发ack_message给spout
   当然如果在DAG图上任意一个节点bolt上fail,那么acker会认为该tuple fail
   于是acker会发fail_message给spout

3. 当spout收到ack或fail message如何处理,
    首先是从pending list里面删掉这条tuple,因为无论ack或fail,只要得到结果,这条tuple就没有继续被cache的必要了
    然后做的事是调用spout.ack或spout.fail
    所以系统默认是不会做任何事的,甚至是fail后的重发,你也需要在fail里面自己实现
    如何实现后面看

4. 如果一条tuple没有被ack或fail,最终是会超时的
    Spout会根据system tick去rotate pending list,对于每个过时的tuple,都调用spout.fail



下面的问题就是如何做fail重发,

这个必须用户通过自己处理fail来做,系统是不会自己做的,


  1. public void fail(Object msgId)
复制代码



看看系统提供的接口,只有msgId这个参数,这里的设计不合理,其实在系统里是有cache整个msg的,只给用户一个messageid,用户如何取得原来的msg

貌似需要自己cache,然后用这个msgId去查询,太坑爹了

阿里自己的Jstorm会提供


  1. public interface IFailValueSpout { void fail(Object msgId, List<object>values); }
复制代码




这样更合理一些, 可以直接取得系统cache的msg values





已有(3)人评论

跳转到指定楼层
howtodown 发表于 2014-10-15 09:37:00
补充一些相关内容:

如果一个消息被完全处理或完全处理失败会发生什么

首先,让我们看看Spout的元组的生命周期。ISpout接口的定义如下:

  1. public interface ISpout extends Serializable {  
  2.     void open(Map conf, TopologyContext context, SpoutOutputCollector colle- ctor);  
  3.     void close();  
  4.     void nextTuple();  
  5.     void ack(Object msgId);  
  6.     void fail(Object msgId);  
  7. }
复制代码

首先,Storm通过调用Spout的nextTuple()方法从Spout请求一个元组。Spout使用open()方法提供的SpoutOutputCollector对象发射一个元组到它的输出流。当发射元组时,Spout会提供一个“消息id”,以便用来识别元组。例如,KestrelSpout从Kestrel消息队列中读取一个消息时,会发射Kestrel提供的“消息id”。下面发射一个消息到SpoutOutputCollector对象:

  1. _collector.emit(new Values("field1", "field2", 3) , msgId);
复制代码


接下来,元组被发送到Bolt,同时Storm负责跟踪创建的消息树。如果Storm检测到一个元组是完全处理的,Storm将调用原Spout任务的ack()方法,把Spout提供给Storm的消息id作为输入参数。同样,如果元组超时,Storm将调用Spout的fail()方法。注意,一个元组将由Spout任务来确认成功或失败,这个Spout任务是创建此元组的完全相同的Spout任务。如果一个Spout跨集群执行很多任务,元组是不会被创建它的那个任务外的其他任务确认成功或失败的。

当KestrelSpout从Kestrel队列取出一个消息时,它将“打开”消息。这意味着消息实际上还没有从队列取出,而是放在“待定”状态等待确认消息已经完成。当在“待定”状态时,一个消息将不会被发送到其他消费者队列。如果客户端断开连接,所有“待定”状态的消息会放回到队列。当消息被打开时,Kestrel向客户端提供信息的数据以及消息的惟一id。当发射元组到SpoutOutputCollector时,KestrelSpout使用精确的id作为元组的“消息id”。之后,当KestrelSpout的ack()或者fail()方法被调用时,KestrelSpout会把一个ack或fail消息以及此消息的id一起发送到Kestrel,以便从队列取出或恢复。








回复

使用道具 举报

muyannian 发表于 2014-12-24 14:10:46
storm的ack和fail:


为了保证数据能正确的被处理, 对于spout产生的每一个tuple, storm都会进行跟踪, 这里面涉及到ack/fail的处理, 如果一个tuple处理成功, 会调用spout的ack方法, 如果失败, 会调用fail方法. 而在处理tuple的每一个bolt都会通过OutputCollector来告知storm, 当前bolt处理是否成功. 为了了解OutputCollector的ack/fail与Spout的ack/fail之间的关系, 我调试跟踪了一下storm代码.

IBasicBolt 实现类不关心ack/fail, spout的ack/fail完全由后面的bolt的ack/fail来决定. 其execute方法的BasicOutputCollector参数也没有提供ack/fail方法给你调用. 相当于忽略了该bolt的ack/fail行为. 所以IBasicBolt用来做filter或者简单的计算比较合适.

可以参考BasicBoltExecutor代码里面的实现就可以明白了:

  1.     public void execute(Tuple input) {
  2.         _collector.setContext(input);
  3.         try {
  4.             _bolt.execute(input, _collector);
  5.             _collector.getOutputter().ack(input);
  6.         } catch(FailedException e) {
  7.             LOG.warn("Failed to process tuple", e);
  8.             _collector.getOutputter().fail(input);
  9.         }
  10.     }
复制代码




在IRichBolt实现类中, 如果OutputCollector.emit(oldTuple, newTuple)这样调用来发射tuple(在storm中称之为anchoring), 那么后面的bolt的ack/fail会影响spout的ack/fail, 如果collector.emit(newTuple)这样来发射tuple(在storm称之为unanchoring), 则相当于断开了后面bolt的ack/fail对spout的影响.spout将立即根据当前bolt前面的ack/fail的情况来决定调用spout的ack/fail. 所以某个bolt后面的bolt的成功失败对你来说不关心, 你可以直接通过这种方式来忽略

中间的某个bolt fail了, 不会影响后面的bolt执行, 但是会立即触发spout的fail. 相当于短路了, 后面bolt虽然也执行了, 但是ack/fail对spout已经无意义了. 也就是说, 只要bolt集合中的任何一个fail了, 会立即触发spout的fail方法. 而ack方法需要所有的bolt调用为ack才能触发.

另外一点, storm只是通过ack/fail机制来告诉应用方bolt中间的处理情况, 对于成功/失败该如何处理, 必须由应用自己来决定, 因为storm内部也没有保存失败的具体数据, 但是也有办法知道失败记录, 因为spout的ack/fail方法会附带一个msgId对象, 我们可以在最初发射tuple的时候将将msgId设置为tuple, 然后在ack/fail中对该tuple进行处理.

这里有个问题, 就是每个bolt执行完之后要显式的调用ack/fail, 否则会出现tuple不释放导致oom. 不知道storm在最初设计的时候, 为什么不将bolt的ack设置为默认调用





回复

使用道具 举报

醉半城 发表于 2016-11-6 19:32:24
谢谢分享经验
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条