立即注册 登录
About云-梭伦科技 返回首页

灰太狼_V0的个人空间 https://aboutyun.com/?4629 [收藏] [复制] [分享] [RSS]

日志

[HBASE源码分析]三 Master命令执行线程Worker---MSG_REGION_OPEN

已有 1656 次阅读2014-6-2 22:52

3.1 MSG_REGION_OPEN 3.1.1 命令处理

代码起始位置:HRegionServer 2120行。

首先检测是否已获取到root表所在region或者当前需要openregion是否是root表。如果没有root表所在region信息则将命令重新放到toDo集合中,等待root表信息获取成功。如果root表信息已经获取成功,则创建并执行open region线程。线程主要调用openRegion方法。

方法中,首先将region正在open(HBaseEventType.RS2ZK_REGION_OPENING)的信息写入zookeeper;然后分配hlog;根据配置中hbase.hregion.impl的值创建HRegion实例并初始化;设置偏好节点(flush有关、favoredNodes);对原有region的每个store文件则进行一次compact(创建compact任务,并放入线程池中执行);将region信息放入在线region集合并从opening region集合中移除;向master发送regionopen信息;向zookeeper中写入regionopen完成(HBaseEventType.RS2ZK_REGION_OPENED)。其中HRegion初始化过程比较复杂(构造,紧接着调用initialize方法),下面分析这个过程。

3.1.2 HRegion初始化

代码起始位置:HRegionServer 2257行。

HRegion构造函数:配置(这里的配置指设置对象引用)表文件路径、文件系统对象;获取key比较器(rootmeta和普通表的比较器不同);配置HLog对象;初始化region用配置对象,并将表属性加入配置中;配置master传递过来的region信息;配置memstore flush监听器;根据hbase.hregion.memstore.percolumnfamilyflush.enabled配置是否允许根据单个familiy刷新memstore文件,开启此项后需要配合hb...store.percolumnfamilyflush.flush.size设置flush阈值;初始化region目录;根据hbase.hregion.keyvalue.timestamp.slop.millisecs设置key可容忍的最新的时间戳;根据hbase.hregion.memstore.flush.size设置flushSize;设置是否允许WAL;根据hbase.hregion.memstore.block.multiplier设置触发flush阈值是flushSize的多少倍;根据hbase.hregion.memstore.block.waitonblock设置flush时是否阻塞;初始化扫库任务集合。

HRegion.initialize方法:这个方法是初始化和open region的主要方法。方法开始时,设置任务监控对象,并为任务监控对象设置动态代理(但是动态代理什么都没有做,意义何在啊),任务监控对象保存在TaskAndWeakRefPair对象并存放到任务监控对象集合中,TaskAndWeakRefPair保存监控对象本身和本身的弱引用,当内存匮乏时,弱引用会较快失效,并在获取集合时会清理时导致监控对象失效从而使其从集合中移除,监控对象在initialize的过程中记录执行状态;然后在region目录中创建region描述文件(如果存在则忽略创建过程);删除原有region临时目录;

然后开始加载原有region存储文件。此段逻辑有一大段注释:加载所有region存储文件(HStores)。当replay log时我们不希望丢失任何数据,所以在replay时必须用谨慎的策略。对于每个存储文件(store),计算最大的已保存到文件中的log id(seqId)。在replay时,忽略小于等于log idhlog。我们不能只从所有存储文件中最大的已保存到文件中的log id中选择最小值,因为多余的、不连贯的log id会产生不确定的问题(不是有版本控制和标记删除么?因为标记删除是可配的?)

接下来看看具体逻辑是什么样的:首先,创建加载存储文件线程池,hbase.hstore.open.and.close.threads.max会限制最大线程数;然后对于每个列族(family),多线程地加载每个列族的存储文件并创建Store对象(包含StoreFile集合,StoreFile代表具体的region列族的数据文件);统计除bulk-loaded外的存储文件的最大的log id用于replay;统计包括bulk-loaded的存储文件的最大的log id,用于初始化region分配给新操作的log id值;统计最大时间戳(可以是用户指定的值)+1,用于版本控制;设置每个Store对象的最后刷新时间为当前时间;然后开始replay log,将返回的log id和上面统计的log id取最大值+1返回函数并设置给HLog对象;replay完成后删除HLog文件。

这段逻辑中又有两部分比较复杂:构造Store对象和replay log

3.1.3 创建Store对象

代码起始位置:HRegion 612行。

构造函数中首先调用自己的另一个进行基础设置的构造函数,首先设置region信息和store目录;设置列族信息和配置;根据hbase.hstore.majorcompaction.compression设置major compact压缩算法;根据普通列族、root列族、meta列族设置不同的compactor;初始化MemStore对象,初始化中主要是初始化多个集合;根据hbase.hregion.max.filesize设置存储文件最大长度;根据hbase.hstore.blockingStoreFiles设置最大compact文件数、hbase.hstore.close.check.interval设置每写入多少字节检测一次是否需要compact

其中有一个十分重要的对象的初始化:CacheConfig cacheConf。这个对象负责block的缓存工作。一级缓存使用LruBlockCacheFactory的实例,二级缓存使用L2BucketCacheFactory的实例,两者都是单例的。首先构件CacheConfig工厂对象,它会获取上述两个类型实例并从配置文件和列族信息中读取多项配置(详见CacheConfig 567)。然后,申请缓存并构建CacheConfig对象。

退出进行基础设置的构造函数后,多线程地加载列族目录中的数据文件(忽略长度为0的文件,生成StoreFile对象集合),主要是根据传入参数设置一些属性;如果文件是引用文件,设置引用文件实际路径;根据io.storefile.bloom.enabled决定是否使用布隆过滤器,然后获取文件更新时间。

加载数据文件后,根据hbase.peak.start.hourhbase.peak.end.hour设置hbase尖峰时间;根据hbase.compactionmanager.class设置CompactionManager实现类;根据kvaggregator设置key value处理实现类KeyValueAggregator实例(类似key value处理钩子);根据compaction_hook设置compact钩子CompactionHook实现类。

3.1.4 replay log

代码起始位置:HRegion 647行。

首先,过滤region目录中符合正则表达式"-?[0-9]+"的文件,这些文件为HLog文件;然后对于每个HLog文件,进行replay,并返回最大log id

replay中,首先创建任务监控对象(3.1.2中所述)并根据hbase.regionserver.hlog.reader.impl创建HLog.Reader类实例。然后顺序处理每一条操作日志的每个key/value:检查regionfamiliy是否为要replayregion中的内容;检测操作日志中的log id是否大于store文件中的log id,这一步之所以要在此处判断是为了统计忽略了多少KeyValue;如果通过检测,则将KeyValue对加入MemStore中,加入后如果需要flush MemStore,则在处理当前操作日志完成后flush一次(会在MSG_REGION_FLUSH中具体分析);每个操作处理完成后,都需要向任务监控对象(具体replay数和忽略数)master汇报(将正在处理open region放到2.3中所说的向master发送的消息集合outboundMsgs)

replay操作完成时,标记任务监控对象;返回最后一次replay的操作的log id


路过

雷人

握手

鲜花

鸡蛋

评论 (0 个评论)

facelist doodle 涂鸦板

您需要登录后才可以评论 登录 | 立即注册

关闭

推荐上一条 /2 下一条