分享

zookeeper适用场景:分布式锁实现

pig2 2014-9-22 23:22:35 发表于 原理型 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 6 52020

问题导读:
1.zookeeper如何实现分布式锁?
2.什么是羊群效应?
3.zookeeper如何释放锁?






zookeeper应用场景有关于分布式集群配置文件同步问题的描述,设想一下如果有100台机器同时对同一台机器上某个文件进行修改,如何才能保证文本不会被写乱,这就是最简单的分布式锁,本文介绍利用zk实现分布式锁。下面是写锁的实现步骤

分布式写锁
create一个PERSISTENT类型的znode,/Locks/write_lock
  • 客户端创建SEQUENCE|EPHEMERAL类型的znode,名字是lockid开头,创建的znode是/Locks/write_lock/lockid0000000001
  • 调用getChildren()不要设置Watcher获取/Locks/write_lock下的znode列表
  • 判断自己步骤2创建znode是不是znode列表中最小的一个,如果是就代表获得了锁,如果不是往下走
  • 调用exists()判断步骤2自己创建的节点编号小1的znode节点(也就是获取的znode节点列表中最小的znode),并且设置Watcher,如果exists()返回false,执行步骤3
  • 如果exists()返回true,那么等待zk通知,从而在回掉函数里返回执行步骤3

释放锁就是删除znode节点或者断开连接就行

*注意:上面步骤2中getChildren()不设置Watcher的原因是,防止羊群效应,如果getChildren()设置了Watcher,那么集群一抖动都会收到通知。在整个分布式锁的竞争过程中,大量重复运行,并且绝大多数的运行结果都是判断出自己并非是序号最小的节点,从而继续等待下一次通知—,这个显然看起来不怎么科学。客户端无端的接受到过多的和自己不相关的事件通知,这如果在集群规模大的时候,会对Server造成很大的性能影响,并且如果一旦同一时间有多个节点的客户端断开连接,这个时候,服务器就会像其余客户端发送大量的事件通知——这就是所谓的羊群效应。
下面是代码实现
  1. import sys
  2. class GJZookeeper(object):
  3.     ZK_HOST = "localhost:2181"
  4.     ROOT = "/Locks"
  5.     WORKERS_PATH = join(ROOT,"write_lock")
  6.     MASTERS_NUM = 1
  7.     TIMEOUT = 10000
  8.     def __init__(self, verbose = True):
  9.         self.VERBOSE = verbose
  10.         self.masters = []
  11.         self.is_master = False
  12.         self.path = None
  13.         self.zk = ZKClient(self.ZK_HOST, timeout = self.TIMEOUT)
  14.         self.say("login ok!")
  15.         # init
  16.         self.__init_zk()
  17.         # register
  18.         self.register()
  19.     def __init_zk(self):
  20.         """
  21.         create the zookeeper node if not exist
  22.         |--Locks
  23.                 |--write_lock
  24.         """
  25.         nodes = (self.ROOT, self.WORKERS_PATH)
  26.         for node in nodes:
  27.             if not self.zk.exists(node):
  28.                 try:
  29.                     self.zk.create(node, "")
  30.                 except:
  31.                     pass
  32.     def register(self):
  33.         """
  34.         register a node for this worker
  35.         |--Locks
  36.                 |--write_lock
  37.                             |--lockid000000000x ==> hostname
  38.         """
  39.         import socket
  40.         hostname = socket.gethostname()
  41.         self.path = self.zk.create(self.WORKERS_PATH + "/lockid", hostname, flags=zookeeper.EPHEMERAL | zookeeper.SEQUENCE)
  42.         self.lockid = basename(self.path)
  43.         self.say("register ok! I'm %s" % self.path)
  44.         # check who is the master
  45.         self.get_lock()
  46.     def get_lock(self):
  47.         """
  48.         get children znode try to get lock
  49.         """
  50.         @watchmethod
  51.         def watcher(event):
  52.             self.say("child changed, try to get lock again.")
  53.             self.get_lock()
  54.         children = self.zk.get_children(self.WORKERS_PATH)
  55.         children.sort()
  56.         min_lock_id = children[0]
  57.         self.say("%s's children: %s" % (self.WORKERS_PATH, children))
  58.         if cmp(self.lockid,min_lock_id) == 0:
  59.             self.get_lock_success()
  60.             return True
  61.         elif cmp(self.lockid,min_lock_id) > 0:
  62.             index = children.index(self.lockid)
  63.             new_lockid_watch = join(self.WORKERS_PATH,children[index-1])
  64.             self.say("Add watch on %s"%new_lockid_watch)
  65.             res = self.zk.exists(new_lockid_watch,watcher)
  66.             if not res :
  67.                 """代表没有存在之前小的锁,但是这并不意味着能拿到锁了,因为还可能有比当前还小的锁,还没轮到,要重新执行一遍"""
  68. #               self.get_lock_success()
  69.                 return False
  70.             else :
  71.                 """现在的锁有人在使用,等他释放了再抢"""
  72.                 self.say("I can not get the lock this time,wait for the next time")
  73.                 return False
  74.     def get_lock_success(self):
  75.         self.say("I get the lock !!!")
  76.         self.write_file()
  77.         self.zk.delete(join(self.WORKERS_PATH,self.lockid))
  78.         self.say("I release the lock !!!")
  79.         sys.exit(1)
  80.     def write_file(self):
  81.         fd = open("lock.log",'a')
  82.         fd.write("%s\n"%self.lockid)
  83.         fd.close()
  84.     def say(self, msg):
  85.         """
  86.         print messages to screen
  87.         """
  88.         if self.VERBOSE:
  89.             if self.path:
  90.                 log.info("[ %s(%s) ] %s" % (self.path, "master" if self.is_master else "slave", msg))
  91.             else:
  92.                 log.info(msg)
  93. def start_get_lock():
  94.     gj_zookeeper = GJZookeeper()
  95. def main():
  96.     th1 = threading.Thread(target = start_get_lock, name = "thread_1", args = ())
  97.     th1.start()
  98.     th1.join()
  99.    
  100. if __name__ == "__main__":
  101.     main()
  102.     time.sleep(1000)
复制代码






相关文章:
zookeeper原理
zookeeper中Watcher和Notifications
zookeeper适用场景:如何竞选Master及代码实现
zookeeper适用场景:配置文件同步
zookeeper适用场景:分布式锁实现
zookeeper适用场景:zookeeper解决了哪些问题





本帖被以下淘专辑推荐:

已有(6)人评论

跳转到指定楼层
ascentzhen 发表于 2014-9-23 10:57:56
好东西,学习啦,之前面试中被问到过
回复

使用道具 举报

pengsuyun 发表于 2014-12-5 17:16:20
回复

使用道具 举报

sprite101 发表于 2015-5-28 09:28:06
回复

使用道具 举报

zhangzh 发表于 2015-8-11 14:56:48
回复

使用道具 举报

bingyuac 发表于 2016-6-9 16:56:07
受益匪浅 谢了
回复

使用道具 举报

老街的腔调 发表于 2017-12-4 17:01:38
刚看到使用zookeeper实现简易分布式锁
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条