import threading
import time
import kazoo.client
# python 版本的CountDown
class CountDownLatch:
def __init__(self, count):
self.count = count
self.condition = threading.Condition()
def awaits(self):
try:
self.condition.acquire()
while self.count > 0:
self.condition.wait()
finally:
self.condition.release()
def countDown(self):
try:
self.condition.acquire()
self.count -= 1
self.condition.notifyAll()
finally:
self.condition.release()
def getCount(self):
return self.count
class MyZK:
def __init__(self, name):
hosts = '192.168.110.130:2181/testLcok'
self.zk = kazoo.client.KazooClient(hosts=hosts, timeout=3)
self.name = name
self.pathName = None
self.cc = CountDownLatch(1)
# 获得锁
def treLock(self):
self.zk.start()
data = self.zk.create(path='/lock',
value=self.name.encode(),
ephemeral=True,
makepath=True,
sequence=True)
# 如果创建成功
if data != None:
# 获取子节点
children_list = self.zk.get_children(path='/', watch=False)
self.pathName = data
# 判断子节点 是否存在
if children_list != None:
# 对子节点排序
children_list.sort()
i = children_list.index(self.pathName[1:])
# 判断是不是第一个
if i == 0:
# 如果是的话就继续执行
print(f'{self.name} i am fist')
self.zk.set('/', self.name.encode())
print(self.zk.get('/'))
# 释放停止阻塞
self.cc.countDown()
else:
self.zk.exists(path='/' + children_list[i - 1], watch=self.call_watch)
def call_watch(self, event):
# node 执行删除的时候再次执行 获取所有的子节点
if event.type == 'DELETED':
children_result = self.zk.get_children(path='/', watch=False)
# 再次执行判断是否是第一个
self.get_callback(children_result)
def get_callback(self, children_result):
if children_result != None:
children_list = sorted(children_result)
i = children_list.index(self.pathName[1:])
# 判断是不是第一个
if i == 0:
print(f'{self.name} i am fist')
self.zk.set('/', self.name.encode())
print(self.zk.get('/'))
self.cc.countDown()
else:
self.zk.exists(path='/' + children_list[i - 1], watch=self.call_watch)
# 不是就跳过
def release_key(self):
print(f'release_key:{self.pathName}')
self.zk.delete(self.pathName, version=-1)
def run():
name = threading.currentThread().getName()
watchzk = MyZK(name)
# 获取锁
watchzk.treLock()
# 释放锁
watchzk.cc.awaits()
watchzk.release_key()
if __name__ == '__main__':
for i in range(10):
t = threading.Thread(target=run)
t.start()
t.join()
有什么问题大家可以在下面留言讨论