问题导读
1、如何理解分流?
2、怎样定义输出流格式?
3、如何识别出数据的来源?
数据流经常需要分流与合并操作,如下图所示:
请参考示例代码
分流
分流有2钟情况,第一种是,相同的tuple发往下一级不同的bolt, 第二种,分别发送不同的tuple到不同的下级bolt上。
发送相同tuple
其实和普通1v1 发送一模一样,就是有2个或多个bolt接收同一个spout或bolt的数据 举例来说:
- SpoutDeclarer spout = builder.setSpout(SequenceTopologyDef.SEQUENCE_SPOUT_NAME,
- new SequenceSpout(), spoutParal);
-
- builder.setBolt(SequenceTopologyDef.TRADE_BOLT_NAME, new PairCount(), 1).shuffleGrouping(
- SequenceTopologyDef.SEQUENCE_SPOUT_NAME);
-
- builder.setBolt(SequenceTopologyDef.CUSTOMER_BOLT_NAME, new PairCount(), 1)
- .shuffleGrouping(SequenceTopologyDef.SEQUENCE_SPOUT_NAME);
复制代码
发送不同的tuple
当发送不同的tuple到不同的下级bolt时, 这个时候,就需要引入stream概念,发送方发送a 消息到接收方A'时使用stream A, 发送b 消息到接收方B'时,使用stream B
在topology提交时:
- builder.setBolt(SequenceTopologyDef.SPLIT_BOLT_NAME, new SplitRecord(), 2).shuffleGrouping(
- SequenceTopologyDef.SEQUENCE_SPOUT_NAME);
-
- builder.setBolt(SequenceTopologyDef.TRADE_BOLT_NAME, new PairCount(), 1).shuffleGrouping(
- SequenceTopologyDef.SPLIT_BOLT_NAME, // --- 发送方名字
- SequenceTopologyDef.TRADE_STREAM_ID); // --- 接收发送方该stream 的tuple
-
- builder.setBolt(SequenceTopologyDef.CUSTOMER_BOLT_NAME, new PairCount(), 1)
- .shuffleGrouping(SequenceTopologyDef.SPLIT_BOLT_NAME, // --- 发送方名字
- SequenceTopologyDef.CUSTOMER_STREAM_ID); // --- 接收发送方该stream 的tuple
复制代码
在发送消息时:
- public void execute(Tuple tuple, BasicOutputCollector collector) {
- tpsCounter.count();
-
- Long tupleId = tuple.getLong(0);
- Object obj = tuple.getValue(1);
-
- if (obj instanceof TradeCustomer) {
-
- TradeCustomer tradeCustomer = (TradeCustomer)obj;
-
- Pair trade = tradeCustomer.getTrade();
- Pair customer = tradeCustomer.getCustomer();
-
- collector.emit(SequenceTopologyDef.TRADE_STREAM_ID,
- new Values(tupleId, trade));
-
- collector.emit(SequenceTopologyDef.CUSTOMER_STREAM_ID,
- new Values(tupleId, customer));
- }else if (obj != null){
- LOG.info("Unknow type " + obj.getClass().getName());
- }else {
- LOG.info("Nullpointer " );
- }
-
- }
复制代码
定义输出流格式:
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declareStream(SequenceTopologyDef.TRADE_STREAM_ID, new Fields("ID", "TRADE"));
- declarer.declareStream(SequenceTopologyDef.CUSTOMER_STREAM_ID, new Fields("ID", "CUSTOMER"));
- }
复制代码
接受消息时,需要判断数据流
- if (input.getSourceStreamId().equals(SequenceTopologyDef.TRADE_STREAM_ID) ) {
- customer = pair;
- customerTuple = input;
-
- tradeTuple = tradeMap.get(tupleId);
- if (tradeTuple == null) {
- customerMap.put(tupleId, input);
- return;
- }
-
- trade = (Pair) tradeTuple.getValue(1);
-
- }
复制代码
数据流合并
生成topology时
在下面例子中, MergeRecord 同时接收SequenceTopologyDef.TRADE_BOLT_NAME 和SequenceTopologyDef.CUSTOMER_BOLT_NAME 的数据
- builder.setBolt(SequenceTopologyDef.TRADE_BOLT_NAME, new PairCount(), 1).shuffleGrouping(
- SequenceTopologyDef.SPLIT_BOLT_NAME,
- SequenceTopologyDef.TRADE_STREAM_ID);
-
- builder.setBolt(SequenceTopologyDef.CUSTOMER_BOLT_NAME, new PairCount(), 1)
- .shuffleGrouping(SequenceTopologyDef.SPLIT_BOLT_NAME,
- SequenceTopologyDef.CUSTOMER_STREAM_ID);
-
- builder.setBolt(SequenceTopologyDef.MERGE_BOLT_NAME, new MergeRecord(), 1)
- .shuffleGrouping(SequenceTopologyDef.TRADE_BOLT_NAME)
- .shuffleGrouping(SequenceTopologyDef.CUSTOMER_BOLT_NAME);
复制代码
发送方
发送的bolt和普通一样,无需特殊处理
接收方
接收方是,区分一下来源component即可识别出数据的来源
- if (input.getSourceComponent().equals(SequenceTopologyDef.CUSTOMER_BOLT_NAME) ) {
- customer = pair;
- customerTuple = input;
-
- tradeTuple = tradeMap.get(tupleId);
- if (tradeTuple == null) {
- customerMap.put(tupleId, input);
- return;
- }
-
- trade = (Pair) tradeTuple.getValue(1);
-
- } else if (input.getSourceComponent().equals(SequenceTopologyDef.TRADE_BOLT_NAME)) {
- trade = pair;
- tradeTuple = input;
-
- customerTuple = customerMap.get(tupleId);
- if (customerTuple == null) {
- tradeMap.put(tupleId, input);
- return;
- }
-
- customer = (Pair) customerTuple.getValue(1);
- }
复制代码
请参考代码:代码
|