分享

Storm常见模式2——批处理

pig2 发表于 2014-4-14 16:31:18 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 10 26568
本帖最后由 pig2 于 2014-4-14 17:16 编辑
阅读本文下面出现一个概念tuple,这里补充一下:
消息流是storm里面的最关键的抽象。一个消息流是一个没有边界的tuple序列, 而这些tuples会被以一种分布式的方式并行地创建和处理。 对消息流的定义主要是对消息流里面的tuple的定义, 我们会给tuple里的每个字段一个名字。 并且不同tuple的对应字段的类型必须一样。 也就是说: 两个tuple的第一个字段的类型必须一样, 第二个字段的类型必须一样, 但是第一个字段和第二个字段可以有不同的类型。 在默认的情况下, tuple的字段类型可以是: integer, long, short, byte, string, double, float, boolean和byte array。 你还可以自定义类型 — 只要你实现对应的序列化器。
更多概念可以参考:
导读:
1.Storm对流数据进行实时处理时,如何处理tuple?
2.ConcurrentLinkedQueue是否可存储tuple?

在读文章内容的时候,经常会忘掉里面的术语是什么,这里在strom中都补充一张图:
Storm对流数据进行实时处理时,一种常见场景是批量一起处理一定数量的tuple元组,而不是每接收一个tuple就立刻处理一个tuple,这样可能是性能的考虑,或者是具体业务的需要。
例如,批量查询或者更新数据库,如果每一条tuple生成一条sql执行一次数据库操作,数据量大的时候,效率会比批量处理的低很多,影响系统吞吐量。
当然,如果要使用Storm的可靠数据处理机制的话,应该使用容器将这些tuple的引用缓存到内存中,直到批量处理的时候,ack这些tuple。
下面给出一个简单的代码示例:
现在,假设我们已经有了一个DBManager数据库操作接口类,它至少有两个接口:
(1)getConnection(): 返回一个java.sql.Connection对象;
(2)getSQL(Tuple tuple): 根据tuple元组生成数据库操作语句。
为了在Bolt中缓存一定数量的tuple,构造Bolt时传递int n参数赋给Bolt的成员变量int count,指定每个n条tuple批量处理一次。
同时,为了在内存中缓存缓存Tuple,使用java concurrent中的ConcurrentLinkedQueue来存储tuple,每当攒够count条tuple,就触发批量处理。
另外,考虑到数据量小(如很长时间内都没有攒够count条tuple)或者count条数设置过大时,因此,Bolt中加入了一个定时器,保证最多每个1秒钟进行一次批量处理tuple。
下面是Bolt的完整代码(仅供参考):
  1. import java.util.Map;
  2. import java.util.Queue;
  3. import java.util.concurrent.ConcurrentLinkedQueue;
  4. import java.sql.Connection;
  5. import java.sql.SQLException;
  6. import java.sql.Statement;
  7. import backtype.storm.task.OutputCollector;
  8. import backtype.storm.task.TopologyContext;
  9. import backtype.storm.topology.IRichBolt;
  10. import backtype.storm.topology.OutputFieldsDeclarer;
  11. import backtype.storm.tuple.Tuple;
  12. public class BatchingBolt implements IRichBolt {
  13.     private static final long serialVersionUID = 1L;
  14.     private OutputCollector collector;
  15.     private Queue<Tuple> tupleQueue = new ConcurrentLinkedQueue<Tuple>();
  16.     private int count;
  17.     private long lastTime;
  18.     private Connection conn;
  19.     public BatchingBolt(int n) {
  20.         count = n; //批量处理的Tuple记录条数
  21.         conn = DBManger.getConnection(); //通过DBManager获取数据库连接
  22.         lastTime = System.currentTimeMillis(); //上次批量处理的时间戳
  23.     }
  24.     @Override
  25.     public void prepare(Map stormConf, TopologyContext context,
  26.             OutputCollector collector) {
  27.         this.collector = collector;
  28.     }
  29.     @Override
  30.     public void execute(Tuple tuple) {
  31.         tupleQueue.add(tuple);
  32.         long currentTime = System.currentTimeMillis();
  33.         // 每count条tuple批量提交一次,或者每个1秒钟提交一次
  34.         if (tupleQueue.size() >= count || currentTime >= lastTime + 1000) {
  35.             Statement stmt = conn.createStatement();
  36.             conn.setAutoCommit(false);
  37.             for (int i = 0; i < count; i++) {
  38.                 Tuple tup = (Tuple) tupleQueue.poll();
  39.                 String sql = DBManager.getSQL(tup); //生成sql语句
  40.                 stmt.addBatch(sql); //加入sql
  41.                 collector.ack(tup); //进行ack
  42.             }
  43.             stmt.executeBatch(); //批量提交sql
  44.             conn.commit();
  45.             conn.setAutoCommit(true);
  46.             System.out.println("batch insert data into database, total records: " + count);
  47.             lastTime = currentTime;
  48.         }
  49.     }
  50.     @Override
  51.     public void cleanup() {
  52.     }
  53.     @Override
  54.     public void declareOutputFields(OutputFieldsDeclarer declarer) {
  55.     }
  56.     @Override
  57.     public Map<String, Object> getComponentConfiguration() {
  58.         // TODO Auto-generated method stub
  59.         return null;
  60.     }
  61. }
复制代码



已有(11)人评论

跳转到指定楼层
南桑 发表于 2014-4-14 16:43:24
提示: 该帖被管理员或版主屏蔽
回复

使用道具 举报

Diablo 发表于 2014-4-14 17:05:34
谢谢楼主很清楚
回复

使用道具 举报

梦回三国 发表于 2014-10-16 21:33:59
我想请教一个问题,有时候当spout发送过来的数据并没有按顺序到达bolt时,这样处理就不对了吧
回复

使用道具 举报

sstutu 发表于 2014-10-17 10:39:13
梦回三国 发表于 2014-10-16 21:33
我想请教一个问题,有时候当spout发送过来的数据并没有按顺序到达bolt时,这样处理就不对了吧
storm是流式处理,由于有多个worker,可能处理的有快有慢

点评

现在正在学习,受教了。  发表于 2018-8-30 10:28
回复

使用道具 举报

抠你的鼻屎去 发表于 2015-3-9 11:34:03
1、ConcurrentLinkedQueue 这个队列的size方法是遍历所有内容,效率比较慢。
2、poll时 是否需要同步问题synchronized(queue)。
回复

使用道具 举报

admln 发表于 2015-4-28 09:52:39
代码里面的FOR循环不应该这样写吧
for (int i = 0; i < count; i++) {}
个人觉得应该是:
for (int i = 0; i < tupleQueue.size(); i++) {}
回复

使用道具 举报

sprite101 发表于 2015-5-6 15:58:58
回复

使用道具 举报

琉璃法师 发表于 2016-8-4 10:40:58
因为Storm是流式计算的,前一个tuple会不会被使用要依赖于后面有没有tuple进来,上述代码假设没有后续tuple进来,那它的队列大小和定时器的计算就不会被触发,所以在最终的统计结果会比原计划的结果数量最多会少count条。

推荐一种方式:使用java的定时器,在prepare里掉用这个定时器,定时器里的方法会隔一段时间就执行,在execute方法中定义好相关入库的参数 ,语句之类的,下面贴上我做的关于批量的定时器方法,欢迎指正:

public void timer1() {
                Timer timer = new Timer();
                timer.scheduleAtFixedRate(new TimerTask() {
                        public void run() {
                                if (!omList.isEmpty()) {
                                        synchronized (omList) {
                                                try {
                                                        List<Tuple> tupleList = new LinkedList<Tuple>();    //tuple存到list
                                                        List<Map<String, String>> ucmaplist = new LinkedList<Map<String, String>>();  //该用户在注册渠道下出现
                                                        List<Map<String, String>> cmaplist = new LinkedList<Map<String, String>>();                //该用户在购彩渠道下出现
                                                        List<List<Column>> sqlList = new LinkedList<List<Column>>();     // 入库的问号传参的参数列表
                                                        List<String> orderIdList = new LinkedList<String>();                        //流量里的订单列表
                                                        List<String> byorderidlist = new LinkedList<String>();                        //redis里存的每条流量来了后 订单号选择的keys
                                                        List<String> byuchannelslist = new LinkedList<String>();                //redis里存的每条流量来了后 注册渠道选择的keys
                                                        List<String> bychannelslist = new LinkedList<String>();                        //redis里存的每条流量来了后 购彩渠道选择的keys
                                                               
                                                        // OrderMsg 该类里封装了每条流量来了会得到的属性
                                                        for (OrderMsg om : omList) {
                                                                ucmaplist.add(om.getUcMap());
                                                                cmaplist.add(om.getcMap());
                                                                byuchannelslist.add(om.getByuchannels());
                                                                bychannelslist.add(om.getBychannels());
                                                                byorderidlist.add(om.getByorderids());
                                                                orderIdList.add(om.getOrderId());
                                                                tupleList.add(om.getTuple());
                                                                sqlList.add(om.getUcTableList());
                                                                sqlList.add(om.getcTableList());
                                                        }
                                                        try {
                                                                //批量入库
                                                                client.executeSqlBatch(ResultTableSql.ChannelCountSql, sqlList);
                                                                //ack 调用
                                                                ack_next(tupleList);
                                                                //下面是出现异常后导致入库失败,将失败的tuple进行重发,已经成功的进行ack处理
                                                        } catch (Exception e) {
                                                                System.out.println("!@#@#####$:mesg:" + e.getMessage());
                                                                if (e.getMessage() != null && !e.getMessage().equals("")) {
                                                                        Integer msg = Integer.parseInt(e.getMessage());
                                                                        if (msg % 2 != 0) {
                                                                                System.err.println("********多出来一条已经被入库的SQL语句!!!");
                                                                                System.err.println("@@@@@@@@该SQL语句对应的问号参数是:"+sqlList.get(msg-1));                                                                               
                                                                        }
                                                                        msg = msg / 2;
                                                                        for (int i = 0; i < msg; i++) {
                                                                                collector.ack(tupleList.get(0));
                                                                                tupleList.remove(0);
                                                                                ucmaplist.remove(0);
                                                                                cmaplist.remove(0);
                                                                                byuchannelslist.remove(0);
                                                                                bychannelslist.remove(0);
                                                                                byorderidlist.remove(0);
                                                                                orderIdList.remove(0);
                                                                        }                                                               
                                                                }
                                                                for (int i = 0; i < byorderidlist.size(); i++) {
                                                                        // 删除redis里的orderId
                                                                        redisKeys_next(byorderidlist.get(i), orderIdList.get(i));
                                                                        // 删除redis里用户+注册渠道
                                                                        String uckey = ucmaplist.get(i).keySet().iterator().next();
                                                                        if (ucmaplist.get(i).get(uckey).equals("1")) {
                                                                                redisKeys_next(byuchannelslist.get(i), uckey);
                                                                        }
                                                                        // 删除redis里的用户+购彩渠道
                                                                        String ckey = cmaplist.get(i).keySet().iterator().next();
                                                                        if (cmaplist.get(i).get(ckey).equals("1")) {
                                                                                redisKeys_next(bychannelslist.get(i), ckey);
                                                                        }
                                                                        // 重发
                                                                        collector.fail(tupleList.get(i));
                                                                }
                                                        } finally {
                                                                ucmaplist.clear();
                                                                cmaplist.clear();
                                                                sqlList.clear();
                                                                tupleList.clear();
                                                                orderIdList.clear();
                                                                byorderidlist.clear();
                                                                byuchannelslist.clear();
                                                                bychannelslist.clear();
                                                        }
                                                } catch (Exception e) {
                                                        System.out.println("定时器外围捕获到异常"+e.getMessage());
                                                } finally {
                                                        omList.clear();
                                                }
                                        }
                                }
                        }
                }, 1000, orderBoltTimer);
        }

在execute里要用synchronize进行对OrderMsg类进行处理:
        synchronized (omList) {
                                        omList.add(om);
                                }       

定时器里的ack_next()方法:       
public void ack_next(List<Tuple> list) {
                Iterator<Tuple> iter = list.iterator();
                while (iter.hasNext()) {
                        Tuple str = iter.next();
                        collector.ack(str);
                }
        }       
回复

使用道具 举报

daozhu 发表于 2017-2-14 18:26:11
不失为一种批量处理的方法
回复

使用道具 举报

12下一页
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条