分享

Storm 的消息可靠性保证机制测试

fc013 发表于 2016-11-26 17:37:06 [显示全部楼层] 只看大图 回帖奖励 阅读模式 关闭右栏 1 7024

问题导读:

1.Storm有哪几种消息保证机制?
2.如何实现可靠的 Bolt?
3.Trident 中,Spout 和 State 分别有哪几种状态?






Storm 是一个分布式的实时计算框架,可以很方便地对流式数据进行实时处理和分析,能运用在实时分析、在线数据挖掘、持续计算以及分布式 RPC 等场景下。Storm 的实时性可以使得数据从收集到处理展示在秒级别内完成,从而为业务方决策提供实时的数据支持。

在美团点评公司内部,实时计算主要应用场景包括实时日志解析、用户行为分析、实时消息推送、消费趋势展示、实时新客判断、实时活跃用户数统计等。这些数据提供给各事业群,并作为他们实时决策的有力依据,弥补了离线计算“T+1”的不足。

在实时计算中,用户不仅仅关心时效性的问题,同时也关心消息处理的成功率。本文将通过实验验证 Storm 的消息可靠性保证机制,文章分为消息保证机制、测试目的、测试环境、测试场景以及总结等五节。

Storm 的消息保证机制
Storm 提供了三种不同层次的消息保证机制,分别是 At Most Once、At Least Once 以及 Exactly Once。消息保证机制依赖于消息是否被完全处理。

消息完全处理
每个从 Spout(Storm 中数据源节点)发出的 Tuple(Storm 中的最小消息单元)可能会生成成千上万个新的 Tuple,形成一棵 Tuple 树,当整棵 Tuple 树的节点都被成功处理了,我们就说从 Spout 发出的 Tuple 被完全处理了。 我们可以通过下面的例子来更好地诠释消息被完全处理这个概念:

[mw_shl_code=java,true]TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("sentences", new KafkaSpout(spoutConfig), spoutNum);
builder.setBolt("split", new SplitSentence(), 10)
    .shuffleGrouping("sentences");
builder.setBolt("count", new WordCount(), 20)
    .fieldsGrouping("split", new Fields("word"));[/mw_shl_code]

这个 Topology 从 Kafka(一个开源的分布式消息队列)读取信息发往下游,下游的 Bolt 将收到的句子分割成单独的单词,并进行计数。每一个从 Spout 发送出来的 Tuple 会衍生出多个新的 Tuple,从 Spout 发送出来的 Tuple 以及后续衍生出来的 Tuple 形成一棵 Tuple 树,下图是一棵 Tuple 树示例:

tuple_tree.png

上图中所有的 Tuple 都被成功处理了,我们才认为 Spout 发出的 Tuple 被完全处理。如果在一个固定的时间内(这个时间可以配置,默认为 30 秒),有至少一个 Tuple 处理失败或超时,则认为整棵 Tuple 树处理失败,即从 Spout 发出的 Tuple 处理失败。

如何实现不同层次的消息保证机制

spout_bolt_acker.png

Tuple 的完全处理需要 Spout、Bolt 以及 Acker(Storm 中用来记录某棵 Tuple 树是否被完全处理的节点)协同完成,如上图所示。从 Spout 发送 Tuple 到下游,并把相应信息通知给 Acker,整棵 Tuple 树中某个 Tuple 被成功处理了都会通知 Acker,待整棵 Tuple 树都被处理完成之后,Acker 将成功处理信息返回给 Spout;如果某个 Tuple 处理失败,或者超时,Acker 将会给 Spout 发送一个处理失败的消息,Spout 根据 Acker 的返回信息以及用户对消息保证机制的选择判断是否需要进行消息重传。

Storm 提供的三种不同消息保证机制中。利用 Spout、Bolt 以及 Acker 的组合我们可以实现 At Most Once 以及 At Least Once 语义,Storm 在 At Least Once 的基础上进行了一次封装(Trident),从而实现 Exactly Once 语义。

Storm 的消息保证机制中,如果需要实现 At Most Once 语义,只需要满足下面任何一条即可:

  • 关闭 ACK 机制,即 Acker 数目设置为 0
  • Spout 不实现可靠性传输
    • Spout 发送消息是使用不带 message ID 的 API
    • 不实现 fail 函数
  • Bolt 不把处理成功或失败的消息发送给 Acker

如果需要实现 At Least Once 语义,则需要同时保证如下几条:

  • 开启 ACK 机制,即 Acker 数目大于 0
  • Spout 实现可靠性传输保证
    • Spout 发送消息时附带 message 的 ID
    • 如果收到 Acker 的处理失败反馈,需要进行消息重传,即实现 fail 函数
  • Bolt 在处理成功或失败后需要调用相应的方法通知 Acker

实现 Exactly Once 语义,则需要在 At Least Once 的基础上进行状态的存储,用来防止重复发送的数据被重复处理,在 Storm 中使用 Trident API 实现。

下图中,每种消息保证机制中左边的字母表示上游发送的消息,右边的字母表示下游接收到的消息。从图中可以知道,At Most Once 中,消息可能会丢失(上游发送了两个 A,下游只收到一个 A);At Least Once 中,消息不会丢失,可能重复(上游只发送了一个 B ,下游收到两个 B);Exactly Once 中,消息不丢失、不重复,因此需要在 At Least Once 的基础上保存相应的状态,表示上游的哪些消息已经成功发送到下游,防止同一条消息发送多次给下游的情况。

3_compare.png

测试目的
Storm 官方提供 At Most Once、At Least Once 以及 Exactly Once 三种不同层次的消息保证机制,我们希望通过相关测试,达到如下目的:

  • 三种消息保证机制的表现,是否与官方的描述相符;
  • At Most Once 语义下,消息的丢失率和什么有关系、关系如何;
  • At Least Once 语义下,消息的重复率和什么有关系、关系如何。

测试环境
本文的测试环境如下: 每个 worker(worker 为一个 物理 JVM 进程,用于运行实际的 Storm 作业)分配 1 CPU 以及 1.6G 内存。Spout、Bolt、Acker 分别跑在单独的 worker 上。并通过在程序中控制抛出异常以及人工 Kill Spout/Bolt/Acker 的方式来模拟实际情况中的异常情况。

三种消息保证机制的测试均由 Spout 从 Kafka 读取测试数据,经由相应 Bolt 进行处理,然后发送到 Kafka,并将 Kafka 上的数据同步到 MySQL 方便最终结果的统计,如下图所示:

test-flow.png

测试数据为 Kafka 上顺序保存的一系列纯数字,数据量分别有十万、五十万、一百万等,每个数字在每个测试样例中出现且仅出现一次。

测试场景
对于三种不同的消息保证机制,我们分别设置了不同的测试场景,来进行充分的测试。其中为了保证 Spout/Bolt/Acker 发生异常的情况下不影响其他节点,在下面的测试中,所有的节点单独运行在独立的 Worker 上。

At Most Once
从背景中可以得知,如果希望实现 At Most Once 语义,将 Acker 的数目设置为 0 即可,本文的测试过程中通过把设置 Acker 为 0 来进行 At Most Once 的测试。

输入数据
保存在 Kafka 上的一系列纯数字,数据量从十万到五百万不等,每个测试样例中,同一个数字在 Kafka 中出现且仅出现一次。

测试结果

异常次数
测试数据总量
结果集中不同 Tuple 的总量
丢失的 Tuple 数据量
Tuple 的丢失百分比
Tuple 的重复量
050000050000000%0
01000000100000000%0
02000000200000000%0
03000000300000000%0

异常次数
测试数据总量
结果集中不同 Tuple 的总量
丢失的 Tuple 数据量
Tuple 的丢失百分比
Tuple 的重复量
1300000027749402250607.50%0
23000000230708769291323.09%0
33000000208282391717730.57%0
430000001420725157927552.64%0
结论
不发生异常的情况下,消息能够不丢不重;Bolt 发生异常的情况下,消息会丢失,不会重复,其中消息的丢失数目与异常次数正相关。与官方文档描述相符,符合预期。

At Least Once
为了实现 At Least Once 语义,需要 Spout、Bolt、Acker 进行配合。我们使用 Kafka-Spout 并通过自己管理 offset 的方式来实现可靠的 Spout;Bolt 通过继承 BaseBasicBolt,自动帮我们建立 Tuple 树以及消息处理之后通知 Acker;将 Acker 的数目设置为 1,即打开 ACK 机制,这样整个 Topology 即可提供 At Least Once 的语义。

测试数据
Kafka 上保存的十万到五十万不等的纯数字,其中每个测试样例中,每个数字在 Kafka 中出现且仅出现一次。

测试结果
Acker 发生异常的情况

异常的次数
测试数据总量
结果集中不重复的 Tuple 数
数据重复的次数(>1)
出现重复的 Tuple 数
数据丢失数量
最大积压量
0100000100000--02000(默认值)
0200000200000--02000
0300000300000--02000
0400000400000--02000

异常的次数
测试数据总量
结果集中不重复的 Tuple 数
数据重复的次数(>1)
出现重复的 Tuple 数
数据丢失数量
最大积压量
11000001000002200002000
21000001000002400102000
31000001000002600002000
41000001000002800002000

Spout 发生异常的情况

异常的次数
测试数据总量
结果集中不重复的 Tuple 数
数据重复的次数(>1)
出现重复的 Tuple 数
数据丢失数量
0100000100000--0
0200000200000--0
0300000300000--0
0400000400000--0

异常的次数
测试数据总量
结果集中不重复的 Tuple 数
数据重复的次数(>1)
出现重复的 Tuple 数
数据丢失数量
1100000100000220520
2100000100000244140
4100000100000290080
6100000100000296900
316750

Bolt 发生异常的情况

调用 emit 函数之前发生异常

异常次数
结果集中不重复的 Tuple 数
数据重复的次数 (>1)
出现重复的 Tuple 数
数据丢失量
0100000--0
0200000--0
0300000--0
0400000--0

异常次数
结果集中不重复的 Tuple 数
数据重复的次数 (>1)
出现重复的 Tuple 数
数据丢失量
1100000--0
2100000--0
4100000--0
8100000--0
10100000--0

调用 emit 函数之后发生异常

异常次数
结果集中不重复的 Tuple 数
数据重复的次数(>1)
出现重复的 Tuple 数
数据丢失数量
0100000--0
0200000--0
0300000--0
0400000--0

异常次数
结果集中不重复的 Tuple 数
数据重复的次数(>1)
出现重复的 Tuple 数
数据丢失数量
1100000220
2100000230
4100000250
8100000290
101000002110
结论
从上面的表格中可以得到,消息不会丢失,可能发生重复,重复的数目与异常的情况相关。

  • 不发生任何异常的情况下,消息不会重复不会丢失。
  • Spout 发生异常的情况下,消息的重复数目约等于 spout.max.pending(Spout 的配置项,每次可以发送的最多消息条数) * NumberOfException(异常次数)。
  • Acker 发生异常的情况下,消息重复的数目等于 spout.max.pending * NumberOfException。
  • Bolt 发生异常的情况:
    • emit 之前发生异常,消息不会重复。
    • emit 之后发生异常,消息重复的次数等于异常的次数。

结论与官方文档所述相符,每条消息至少发送一次,保证数据不会丢失,但可能重复,符合预期。

Exactly Once
对于 Exactly Once 的语义,利用 Storm 中的 Trident 来实现。

测试数据
Kafka 上保存的一万到一百万不等的数字,每个数字在每次测试样例中出现且仅出现一次。

测试结果
Spout 发生异常情况

异常数
测试数据量
结果集中不重复的 Tuple 数
结果集中所有 Tuple 的总和
1100001000050005000
2100001000050005000
3100001000050005000

Acker 发生异常的情况

异常数
测试数据量
结果集中不重复的 Tuple 数
结果集中所有 Tuple 的总和
1100001000050005000
2100001000050005000
3100001000050005000

Bolt 发生异常的情况

异常数
测试数据量
结果集中不重复的 Tuple 数
结果集中所有 Tuple 的总和
1100001000050005000
2100001000050005000
3100001000050005000
结论
在所有情况下,最终结果集中的消息不会丢失,不会重复,与官方文档中的描述相符,符合预期。

总结
对 Storm 提供的三种不同消息保证机制,用户可以根据自己的需求选择不同的消息保证机制。

不同消息可靠性保证的使用场景
对于 Storm 提供的三种消息可靠性保证,优缺点以及使用场景如下所示:

可靠性保证层次
优点
缺点
使用场景
At most once处理速度快数据可能丢失都处理速度要求高,且对数据丢失容忍度高的场景
At least once数据不会丢失数据可能重复不能容忍数据丢失,可以容忍数据重复的场景
Exactly once数据不会丢失,不会重复处理速度慢对数据不丢不重性质要求非常高,且处理速度要求没那么高,比如支付金额
如何实现不同层次的消息可靠性保证
对于 At Least Once 的保证需要做如下几步:

  • 需要开启 ACK 机制,即 Topology 中的 Acker 数量大于零;
  • Spout 是可靠的。即 Spout 发送消息的时候需要附带 msgId,并且实现失败消息重传功能(fail 函数 ,可以参考下面的 Spout 代码);
  • Bolt 在发送消息时,需要调用 emit(inputTuple, outputTuple)进行建立 anchor 树(参考下面建立 anchor 树的代码),并且在成功处理之后调用 ack ,处理失败时调用 fail 函数,通知 Acker。

不满足以上三条中任意一条的都只提供 At Most Once 的消息可靠性保证,如果希望得到 Exactly Once 的消息可靠性保证,可以使用 Trident 进行实现。

不同层次的可靠性保证如何实现如何实现可靠的 Spout
实现可靠的 Spout 需要在 nextTuple 函数中发送消息时,调用带 msgID 的 emit 方法,然后实现失败消息的重传(fail 函数),参考如下示例:

[mw_shl_code=java,true]/**
     * 想实现可靠的 Spout,需要实现如下两点
     * 1. 在 nextTuple 函数中调用 emit 函数时需要带一个     msgId,用来表示当前的消息(如果消息发送失败会用 msgId 作为参数回调 fail 函数)
     * 2. 自己实现 fail 函数,进行重发(注意,在 storm 中没有 msgId 和消息的对应关系,需要自己进行维护)
     */
public void nextTuple() {
    //设置 msgId 和 Value 一样,方便 fail 之后重发
    collector.emit(new Values(curNum + "", round +     ""), curNum + ":" + round);
}

@Override
public void fail(Object msgId) {//消息发送失败时的回调函数
String tmp = (String)msgId;   //上面我们设置了 msgId 和消息相同,这里通过 msgId 解析出具体的消息
String[] args = tmp.split(":");

//消息进行重发
collector.emit(new Values(args[0], args[1]), msgId);
}[/mw_shl_code]

如何实现可靠的 Bolt
Storm 提供两种不同类型的 Bolt,分别是 BaseRichBolt 和 BaseBasicBolt,都可以实现可靠性消息传递,不过 BaseRichBolt 需要自己做很多周边的事情(建立 anchor 树,以及手动 ACK/FAIL 通知 Acker),使用场景更广泛,而 BaseBasicBolt 则由 Storm 帮忙实现了很多周边的事情,实现起来方便简单,但是使用场景单一。如何用这两个 Bolt 实现(不)可靠的消息传递如下所示:

[mw_shl_code=java,true]//BaseRichBolt 实现不可靠消息传递
public class SplitSentence extends BaseRichBolt {//不建立 anchor 树的例子
    OutputCollector _collector;

    public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
        _collector = collector;
    }

    public void execute(Tuple tuple) {
        String sentence = tuple.getString(0);
        for(String word: sentence.split(" ")) {
            _collector.emit(new Values(word));  // 不建立 anchor 树
        }
        _collector.ack(tuple);          //手动 ack,如果不建立 anchor 树,是否 ack 是没有区别的,这句可以进行注释
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
    }      
}

//BaseRichBolt 实现可靠的 Bolt
public class SplitSentence extends BaseRichBolt {//建立 anchor 树以及手动 ack 的例子
    OutputCollector _collector;

    public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
        _collector = collector;
    }

    public void execute(Tuple tuple) {
        String sentence = tuple.getString(0);
        for(String word: sentence.split(" ")) {
            _collector.emit(tuple, new Values(word));  // 建立 anchor 树
        }
        _collector.ack(tuple);          //手动 ack,如果想让 Spout 重发该 Tuple,则调用 _collector.fail(tuple);
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
    }      
}

下面的示例会可以建立 Multi-anchoring
List<Tuple> anchors = new ArrayList<Tuple>();
anchors.add(tuple1);
anchors.add(tuple2);
_collector.emit(anchors, new Values(1, 2, 3));

//BaseBasicBolt 是吸纳可靠的消息传递
public class SplitSentence extends BaseBasicBolt {//自动建立 anchor,自动 ack
    public void execute(Tuple tuple, BasicOutputCollector collector) {
        String sentence = tuple.getString(0);
        for(String word: sentence.split(" ")) {
            collector.emit(new Values(word));
        }
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
    }      
}[/mw_shl_code]

Trident
在 Trident 中,Spout 和 State 分别有三种状态,如下图所示:

spout-vs-state.png

其中表格中的 Yes 表示相应的 Spout 和 State 组合可以实现 Exactly Once 语义,No 表示相应的 Spout 和 State 组合不保证 Exactly Once 语义。下面的代码是一个 Trident 示例:

[mw_shl_code=java,true] OpaqueTridentKafkaSpout spout = new OpaqueTridentKafkaSpout(spoutConf);   //Opaque Spout
    //TransactionalTridentKafkaSpout spout = new TransactionalTridentKafkaSpout(spoutConf);   //Transaction Spout

    TridentTopology topology = new TridentTopology();
    String spoutTxid = Utils.kafkaSpoutGroupIdBuilder(topologyConfig.kafkaSrcTopic, topologyConfig.topologyName);
    Stream stream = topology.newStream(spoutTxid, spout)
            .name("new stream")
            .parallelismHint(1);

    // kafka config
    KafkaProducerConfig kafkaProducerConfig = new KafkaProducerConfig();      //KafkaProducerConfig 仅对 kafka 相关配置进行了封装,具体可以参考 TridentKafkaStateFactory2(Map<String, String> config)
    Map<String, String> kafkaConfigs = kafkaProducerConfig.loadFromConfig(topologyConfig);
    TridentToKafkaMapper tridentToKafkaMapper = new TridentToKafkaMapper();  //TridentToKafkaMapper 继承自 TridentTupleToKafkaMapper<String, String>,实现 getMessageFromTuple 接口,该接口中返回 tridentTuple.getString(0);

    String  dstTopic = "test__topic_for_all";

    TridentKafkaStateFactory2 stateFactory = new TridentKafkaStateFactory2(kafkaConfigs);
    stateFactory.withTridentTupleToKafkaMapper(tridentToKafkaMapper);
    stateFactory.withKafkaTopicSelector(new DefaultTopicSelector(dstTopic));

    stream.each(new Fields("bytes"), new AddMarkFunction(), new Fields("word")) //从spout 出来数据是一个 bytes 类型的数据,第二个是参数是自己的处理函数,第三个参数是处理函数的输出字段
            .name("write2kafka")
            .partitionPersist(stateFactory         //将数据写入到 Kafka 中,可以保证写入到 Kafka 的数据是 exactly once 的
                    , new Fields("word")
                    , new TridentKafkaUpdater())
            .parallelismHint(1);[/mw_shl_code]


来源:meituan
作者:从贤


已有(1)人评论

跳转到指定楼层
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条