分享

Storm常见模式——TimeCacheMap

fanbells 发表于 2014-4-12 19:29:52 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 1 11693
1、TimeCacheMap实现原理是什么?

2、性能如何?



已有(1)人评论

跳转到指定楼层
fanbells 发表于 2014-4-12 19:35:02
本帖最后由 pig2 于 2014-4-12 20:36 编辑

Storm中使用一种叫做TimeCacheMap的数据结构,用于在内存中保存近期活跃的对象,它的实现非常地高效,而且可以自动删除过期不再活跃的对象。
TimeCacheMap使用多个桶buckets来缩小锁的粒度,以此换取高并发读写性能。下面我们来看看TimeCacheMap内部是如何实现的。
1. 实现原理
桶链表:链表中每个元素是一个HashMap,用于保存key,value格式的数据。
    private LinkedList<HashMap<K, V>> _buckets;
锁对象:用于对TimeCacheMap进行get/put等操作时上锁保证原子性。
    private final Object _lock = new Object();
后台清理线程:负责超时后清理数据。
    private Thread _cleaner;
超时回调接口:用于超时后进行函数回调,做一些其他处理。
    public static interface ExpiredCallback<K, V> {        public void expire(K key, V val);    }    private ExpiredCallback _callback;
有了以上数据结构,下面来看看构造函数的具体实现:
1、 首先,初始化指定个数的bucket,以链式链表形式存储,每个bucket中放入空的HashMap;
2、 然后,设置清理线程,处理流程为:
  a)   休眠expirationMillis / (numBuckets-1)毫秒时间(即:expirationSecs / (numBuckets-1)秒);
  b)   对_lock对象上锁,然后从buckets链表中移除最后一个元素;
  c)   向buckets链表头部新加入一个空的HashMap桶,解除_lock对象锁;
  d)   如果设置了callback函数,则进行回调。
  1. public TimeCacheMap(int expirationSecs, int numBuckets, ExpiredCallback<K, V> callback) {
  2.         if(numBuckets<2) {
  3.             throw new IllegalArgumentException("numBuckets must be >= 2");
  4.         }
  5.         _buckets = new LinkedList<HashMap<K, V>>();
  6.         for(int i=0; i<numBuckets; i++) {
  7.             _buckets.add(new HashMap<K, V>());
  8.         }
  9.         _callback = callback;
  10.         final long expirationMillis = expirationSecs * 1000L;
  11.         final long sleepTime = expirationMillis / (numBuckets-1);
  12.         _cleaner = new Thread(new Runnable() {
  13.             public void run() {
  14.                 try {
  15.                     while(true) {
  16.                         Map<K, V> dead = null;
  17.                         Time.sleep(sleepTime);
  18.                         synchronized(_lock) {
  19.                             dead = _buckets.removeLast();
  20.                             _buckets.addFirst(new HashMap<K, V>());
  21.                         }
  22.                         if(_callback!=null) {
  23.                             for(Entry<K, V> entry: dead.entrySet()) {
  24.                                 _callback.expire(entry.getKey(), entry.getValue());
  25.                             }
  26.                         }
  27.                     }
  28.                 } catch (InterruptedException ex) {
  29.                 }
  30.             }
  31.         });
  32.         _cleaner.setDaemon(true);
  33.         _cleaner.start();
  34.     }
复制代码

构造函数需要传递三个参数:expirationSecs:超时的时间,单位为秒;numBuckets:桶的个数;callback:超时回调函数。
为了方便使用,还提供了以下三种形式的构造函数,使用时可以根据需要选择:
  1. //this default ensures things expire at most 50% past the expiration time
  2.     private static final int DEFAULT_NUM_BUCKETS = 3;
  3.     public TimeCacheMap(int expirationSecs, ExpiredCallback<K, V> callback) {
  4.         this(expirationSecs, DEFAULT_NUM_BUCKETS, callback);
  5.     }
  6.     public TimeCacheMap(int expirationSecs, int numBuckets) {
  7.         this(expirationSecs, numBuckets, null);
  8.     }
  9.     public TimeCacheMap(int expirationSecs) {
  10.         this(expirationSecs, DEFAULT_NUM_BUCKETS);
  11.     }
复制代码
2. 性能分析
get操作:遍历各个bucket,如果存在指定的key则返回,时间复杂度为O(numBuckets)
  1. public V get(K key) {
  2.         synchronized(_lock) {
  3.             for(HashMap<K, V> bucket: _buckets) {
  4.                 if(bucket.containsKey(key)) {
  5.                     return bucket.get(key);
  6.                 }
  7.             }
  8.             return null;
  9.         }
  10.     }
复制代码

put操作:将key,value放到_buckets的第一个桶中,然后遍历其他numBuckets-1个桶,从HashMap中移除其中键为key的记录,时间复杂度为O(numBuckets)
  1. public void put(K key, V value) {
  2.         synchronized(_lock) {
  3.             Iterator<HashMap<K, V>> it = _buckets.iterator();
  4.             HashMap<K, V> bucket = it.next();
  5.             bucket.put(key, value);
  6.             while(it.hasNext()) {
  7.                 bucket = it.next();
  8.                 bucket.remove(key);
  9.             }
  10.         }
  11.     }
复制代码

remove操作:遍历各个bucket,如果存在以key为键的记录,直接删除,时间复杂度为O(numBuckets)
  1. public Object remove(K key) {
  2.         synchronized(_lock) {
  3.             for(HashMap<K, V> bucket: _buckets) {
  4.                 if(bucket.containsKey(key)) {
  5.                     return bucket.remove(key);
  6.                 }
  7.             }
  8.             return null;
  9.         }
  10.     }
复制代码

containsKey操作:遍历各个bucket,如果存在指定的key则返回true,否则返回false,时间复杂度为O(numBuckets)
  1. public boolean containsKey(K key) {
  2.         synchronized(_lock) {
  3.             for(HashMap<K, V> bucket: _buckets) {
  4.                 if(bucket.containsKey(key)) {
  5.                     return true;
  6.                 }
  7.             }
  8.             return false;
  9.         }
  10.     }
复制代码

size操作:遍历各个bucket,累加各个bucket的HashMap的大小,时间复杂度为O (numBuckets)
  1. public int size() {
  2.         synchronized(_lock) {
  3.             int size = 0;
  4.             for(HashMap<K, V> bucket: _buckets) {
  5.                 size+=bucket.size();
  6.             }
  7.             return size;
  8.         }
  9.     }
复制代码

3. 超时时间
经过上面对put操作和_cleaner线程的分析,我们已经知道:
  a) put操作将数据放到_buckets的第一个桶中,然后遍历其他numBuckets-1个桶,从HashMap中移除其中键为key的记录;
  b) _cleaner线程每隔expirationSecs / (numBuckets-1)秒会把_buckets中最后一个桶中的数据从TimeCacheMap中移除掉。
因此,假设_cleaner线程刚刚清理数据,put函数调用发生将key放入桶中,那么一条数据的超时时间为:
expirationSecs / (numBuckets-1) * numBuckets = expirationSecs * (1 + 1 / (numBuckets-1))
然而,假设put函数调用刚刚执行结束,_cleaner线程就开始清理数据,那么一条数据的超时时间为:
expirationSecs / (numBuckets-1) * numBuckets - expirationSecs / (numBuckets-1) = expirationSecs
4. 总结
1、 TimeCacheMap的高效之处在于锁的粒度小,O(1)时间内完成锁操作,因此,大部分时间内都可以进行get和put操作。
2、 get,put,remove,containsKey和size操作都可以在O(numBuckets)时间内完成,其中numBuckets是桶的个数,默认为3。
3、 未更新数据的超时时间在expirationSecs和expirationSecs * (1 + 1 / (numBuckets-1))之间。


回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条