分享

zookeeper项目使用经验总结

本帖最后由 pig2 于 2014-9-26 13:02 编辑
问题导读

1.为什么让集群存在优先级?
2.不同地区(美国、青岛、香港等)的集群,该如何使用zookeeper?
3.异步Watcher处理的作用是什么?
4.Watcher是否为原子性?        






背景  
前段时间学习了zookeeper后,在新的项目中刚好派上了用场,我在项目中主要负责分布式任务调度模块的开发,对我自己来说是个不小的挑战。
  分布式的任务调度,技术上我们选择了zookeeper,具体的整个分布式任务调度的架构选择会另起一篇文章进行介绍。

  本文主要是介绍自己在项目中zookeeper的一些扩展使用,希望可以对大家有所帮助。
  项目中使用的zookeeper版本3.3.3,对应的文档地址: http://zookeeper.apache.org/doc/trunk/

扩展一:优先集群

先来点背景知识:
1.zookeeper中的server机器之间会组成leader/follower集群,1:n的关系。采用了paxos一致性算法保证了数据的一致性,就是leader/follower会采用通讯的方式进行投票来实现paxns。

2.zookeeper还支持一种observer模式,提供只读服务不参与投票,提升系统,对应文档: http://zookeeper.apache.org/doc/trunk/zookeeperObservers.html

我们项目特性的决定了我们需要进行跨机房操作,比如杭州,美国,香港,青岛等多个机房之间进行数据交互。
跨机房之间对应的网络延迟都比较大,比如中美机房走海底光缆有ping操作200ms的延迟,杭州和青岛机房有70ms的延迟。

为了提升系统的网络性能,我们在部署zookeeper网络时会在每个机房部署节点,多个机房之间再组成一个大的网络保证数据一致性。(zookeeper千万别再搞多个集群)

最后的部署结构就会是:
  • 杭州机房  >=3台 (构建leader/follower的zk集群)
  • 青岛机房  >=1台 (构建observer的zk集群)
  • 美国机房  >=1台 (构建observer的zk集群)
  • 香港机房  >=1台 (构建observer的zk集群)

1a065794-4536-391d-839a-26f3275ce378.jpg

一句话概括就是: 在单个机房内组成一个投票集群,外围的机房都会是一个observer集群和投票集群进行数据交互。 这样部署的一些好处,大家可以细细体会一下

针对这样的部署结构,我们会引入一个优先集群问题: 比如在美国机房的机器需要优先去访问本机房的zk集群,访问不到后才去访问杭州机房。
默认在zookeeper3.3.3的实现中,认为所有的节点都是对等的。并没有对应的优先集群的概念,单个机器也没有对应的优先级的概念。

扩展代码:(比较暴力,采用反射的方式改变了zk client的集群列表)
  • 先使用美国机房的集群ip初始化一次zk client
  • 通过反射方式,强制在初始化后的zk client中的server列表中又加入杭州机房的机器列表
  1. ZooKeeper zk = null;  
  2.         try {  
  3.             zk = new ZooKeeper(cluster1, sessionTimeout, new AsyncWatcher() {  
  4.   
  5.                 public void asyncProcess(WatchedEvent event) {  
  6.                     //do nothing   
  7.                 }  
  8.   
  9.             });  
  10.             if (serveraddrs.size() > 1) {  
  11.                 // 强制的声明accessible  
  12.                 ReflectionUtils.makeAccessible(clientCnxnField);  
  13.                 ReflectionUtils.makeAccessible(serverAddrsField);  
  14.                 // 添加第二组集群列表  
  15.                 for (int i = 1; i < serveraddrs.size(); i++) {  
  16.                     String cluster = serveraddrs.get(i);  
  17.                     // 强制获取zk中的地址信息  
  18.                     ClientCnxn cnxn = (ClientCnxn) ReflectionUtils.getField(clientCnxnField, zk);  
  19.                     List<InetSocketAddress> serverAddrs = (List<InetSocketAddress>) ReflectionUtils  
  20.                             .getField(serverAddrsField, cnxn);  
  21.                     // 添加第二组集群列表  
  22.                     serverAddrs.addAll(buildServerAddrs(cluster));  
  23.                 }  
  24.             }  
  25.         }  
复制代码







扩展二:异步Watcher处理  

最早在看zookeeper的代码时,一直对它的watcher处理比较满意,使用watcher推送数据可以很方便的实现分布式锁的功能。
zookeeper的watcher实现原理也挺简单的,就是在zookeeper client和zookeeper server上都保存一份对应的watcher对象。每个zookeeper机器都会有一份完整的node tree数据和watcher数据,每次leader通知follower/observer数据发生变更后,每个zookeeper server会根据自己节点中的watcher事件推送给响应的zookeeper client,每个zk client收到后再根据内存中的watcher引用,进行回调。

这里会有个问题,就是zk client在处理watcher时,回凋的过程是一个串行的执行过程,所以单个watcher的处理慢会影响整个列表的响应。
可以看一下ClientCnxn类中的EventThread处理,该线程会定时消费一个queue的数据,挨个调用processEvent(Object event) 进行回调处理。

扩展代码:

  1. public abstract class AsyncWatcher implements Watcher {  
  2.   
  3.     private static final int       DEFAULT_POOL_SIZE    = 30;  
  4.     private static final int       DEFAULT_ACCEPT_COUNT = 60;  
  5.   
  6.     private static ExecutorService executor             = new ThreadPoolExecutor(  
  7.                                                                 1,  
  8.                                                                 DEFAULT_POOL_SIZE,  
  9.                                                                 0L,  
  10.                                                                 TimeUnit.MILLISECONDS,  
  11.                                                                 new ArrayBlockingQueue(  
  12.                                                                         DEFAULT_ACCEPT_COUNT),  
  13.                                                                 new NamedThreadFactory(  
  14.                                                                         "Arbitrate-Async-Watcher"),  
  15.                                                                 new ThreadPoolExecutor.CallerRunsPolicy());  
  16.   
  17.     public void process(final WatchedEvent event) {  
  18.         executor.execute(new Runnable() {//提交异步处理  
  19.   
  20.                     @Override  
  21.                     public void run() {  
  22.                         asyncProcess(event);  
  23.                     }  
  24.                 });  
  25.   
  26.     }  
  27.   
  28.     public abstract void asyncProcess(WatchedEvent event);  
  29.   
  30. }  
复制代码








说明:
  • zookeeper针对watcher的调用是以单线程串行的方式进行处理,容易造成堵塞影响,monitor的数据同步及时性
  • AsyncWatcher为采取的一种策略为当不超过acceptCount=60的任务时,会采用异步线程的方式处理。如果超过60任务,会变为原先的单线程串行的模式
扩展三:重试处理

这个也不多说啥,看一下相关文档就清楚了

需要特殊处理下ConnectionLoss的异常,一种可恢复的异常。

重试处理:

  1. public interface ZooKeeperOperation<T> {  
  2.   
  3.     public T execute() throws KeeperException, InterruptedException;  
  4. }  
  5.   
  6.   
  7. /**
  8.      * 包装重试策略
  9.      */  
  10.     public <T> T retryOperation(ZooKeeperOperation<T> operation) throws KeeperException,  
  11.             InterruptedException {  
  12.         KeeperException exception = null;  
  13.         for (int i = 0; i < maxRetry; i++) {  
  14.             try {  
  15.                 return (T) operation.execute();  
  16.             } catch (KeeperException.SessionExpiredException e) {  
  17.                 logger.warn("Session expired for: " + this + " so reconnecting due to: " + e, e);  
  18.                 throw e;  
  19.             } catch (KeeperException.ConnectionLossException e) { //特殊处理Connection Loss  
  20.                 if (exception == null) {  
  21.                     exception = e;  
  22.                 }  
  23.                 logger.warn("Attempt " + i + " failed with connection loss so "  
  24.                         + "attempting to reconnect: " + e, e);  
  25.   
  26.                 retryDelay(i);  
  27.             }  
  28.         }  
  29.   
  30.         throw exception;  
  31.     }  
复制代码






注意点:Watcher原子性

在使用zookeeper的过程中,需要特别注意一点就是注册对应watcher事件时,如果当前的节点已经满足了条件,比如exist的watcher,它不会触发你的watcher,而会等待下一次watcher条件的满足。
它的watcher是一个一次性的监听,而不是一个永久的订阅过程。所以在watcher响应和再次注册watcher过程并不是一个原子操作,编写多线程代码和锁时需要特别注意

总结  zookeepr是一个挺不错的产品,源代码写的也非常不错,大量使用了queue和异步Thread的处理模式,真是一个伟大的产品。









已有(10)人评论

跳转到指定楼层
pengsuyun 发表于 2014-9-26 06:36:00
好东西,必须顶顶。
回复

使用道具 举报

hb1984 发表于 2014-9-26 11:47:47
谢谢楼主分享。                 
回复

使用道具 举报

pengsuyun 发表于 2014-12-5 17:14:48
谢谢楼主分享。   
回复

使用道具 举报

chinaboy2005 发表于 2014-12-6 14:44:59
学习中,资料挺好的
回复

使用道具 举报

crazyfish1986 发表于 2014-12-7 09:57:10
zookeeper的解读资料很少,求这方面的资料
回复

使用道具 举报

lyk20042810 发表于 2015-8-26 15:31:29
很不错的资料
回复

使用道具 举报

zcfightings 发表于 2015-9-10 17:15:26
先顶起  不过现在还不太懂 继续学习zookeeper  怀挺  怀挺  怀挺
回复

使用道具 举报

xiaosong_6666 发表于 2016-5-18 17:57:01
不错的文章
回复

使用道具 举报

12下一页
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条