当前位置: 首页 > 工具软件 > Kazoo v3 > 使用案例 >

python zookeeper kazoo实现分布式锁

张高澹
2023-12-01
import threading
import time
import kazoo.client

# python 版本的CountDown
class CountDownLatch:

    def __init__(self, count):
        self.count = count
        self.condition = threading.Condition()

    def awaits(self):
        try:
            self.condition.acquire()
            while self.count > 0:
                self.condition.wait()
        finally:
            self.condition.release()

    def countDown(self):
        try:
            self.condition.acquire()
            self.count -= 1
            self.condition.notifyAll()
        finally:
            self.condition.release()

    def getCount(self):
        return self.count


class MyZK:
    def __init__(self, name):
        hosts = '192.168.110.130:2181/testLcok'
        self.zk = kazoo.client.KazooClient(hosts=hosts, timeout=3)
        self.name = name
        self.pathName = None
        self.cc = CountDownLatch(1)

    # 获得锁
    def treLock(self):
        self.zk.start()
        data = self.zk.create(path='/lock',
                              value=self.name.encode(),
                              ephemeral=True,
                              makepath=True,
                              sequence=True)
        # 如果创建成功
        if data != None:
            # 获取子节点
            children_list = self.zk.get_children(path='/', watch=False)
            self.pathName = data
            # 判断子节点 是否存在
            if children_list != None:
                # 对子节点排序
                children_list.sort()
                i = children_list.index(self.pathName[1:])
                # 判断是不是第一个
                if i == 0:
                    # 如果是的话就继续执行
                    print(f'{self.name} i am fist')
                    self.zk.set('/', self.name.encode())
                    print(self.zk.get('/'))
                    # 释放停止阻塞
                    self.cc.countDown()
                else:
                    self.zk.exists(path='/' + children_list[i - 1], watch=self.call_watch)

    def call_watch(self, event):
        # node 执行删除的时候再次执行 获取所有的子节点
        if event.type == 'DELETED':
            children_result = self.zk.get_children(path='/', watch=False)
            # 再次执行判断是否是第一个
            self.get_callback(children_result)

    def get_callback(self, children_result):
        if children_result != None:
            children_list = sorted(children_result)
            i = children_list.index(self.pathName[1:])
            # 判断是不是第一个
            if i == 0:
                print(f'{self.name} i am fist')
                self.zk.set('/', self.name.encode())
                print(self.zk.get('/'))
                self.cc.countDown()
            else:
                self.zk.exists(path='/' + children_list[i - 1], watch=self.call_watch)
            # 不是就跳过

    def release_key(self):
        print(f'release_key:{self.pathName}')
        self.zk.delete(self.pathName, version=-1)


def run():
    name = threading.currentThread().getName()
    watchzk = MyZK(name)

    # 获取锁
    watchzk.treLock()
    # 释放锁
    watchzk.cc.awaits()
    watchzk.release_key()


if __name__ == '__main__':
    for i in range(10):
        t = threading.Thread(target=run)
        t.start()
    t.join()

有什么问题大家可以在下面留言讨论

 类似资料: