当前位置: 首页 > 工具软件 > Kazoo > 使用案例 >

使用kazoo连接zookeeper并监听节点数量以及值变化

夏英发
2023-12-01

  目前kazoo是连接zk的最新第三方库,最新更新时间为2019年1月,其他第三方连接zk的库都长时间未更新,所以推荐使用kazoo。前面有几篇文章都已经详细给出了zk的部署,接下来是zk最核心的地方,将zk的数据结构特性跟业务场景相结合,实现复杂需求,本文给出基本demo用法介绍。

1、监控节点数量的变化

  基本操作,创建、更新、删除,kazoo接口已经足够简单,入参类型如果不懂,可以直接看源码,同时也有助于深入了解别人是如何构思python“中间件”

from kazoo import exceptions
from kazoo.client import KazooClient
from kazoo.client import ChildrenWatch
from kazoo.client import DataWatch

def normal_test(zk_path,host,port,node_list):
    zk=KazooClient(hosts=host+':'+port,timeout=5)
    zk.start(timeout=5)

    if not zk.exists(zk_path):
        print("node:{} does't exists".format(zk_path))
        # 创建当前节点,持久性节点,值需要设为byte类型
        zk.create(path=zk_path,value=b'bar')

    # 这里是获取当前节点的子节点列表,可以设定watch以及是否返回节点数据
    child_node_list=zk.get_children(zk_path,watch=None,include_data=False)
    # 创建多个子节点,值可以设为一样,因为这里关注子节点是否存在,不关心其值
    if not child_node_list:
        for sub_node in node_list:
            zk.create(zk_path + '/' + sub_node,b'1')
    else:
        print('subnode list:{}'.format(child_node_list))

    # 获取当前节点的znode对象:含data和ZnodeStat对象
    data,stat=zk.get(zk_path)
    print('current node data:{}'.format(data))
    print('data version:{}'.format(stat.version))
    print('data length:{}'.format(stat.data_length))
    print('children node numbers:{}'.format(stat.numChildren))

    # 更新节点数据,可以指定值和版本,成功更新则ZnodeStat 对象
    stat_new=zk.set(zk_path,value=b'foo')
    print('node {0} is updated:{1}'.format(zk_path,stat_new))

    # 删除当前节点,若当前节点有子节点,则提示无法删除,需要使用递归删除
    zk.delete(zk_path,recursive=True)
    try:
        last=zk.get_children(zk_path)
        print('children nodes :{}'.format(last))
    except exceptions.NoNodeError:
        print('no children nodes')

    zk.stop()
    zk.close()


if __name__=='__main__':
    normal_test(zk_path='/app_conf',host='192.168.100.5',port='2181',node_list=['foo1','foo2','foo3'])

  监控节点数量的变化,可以应用到相关的场景:
1)把节点名称设为服务器IP,可以实现服务器集群管理,服务(服务接口)上、下线通知等,又称服务发现、服务监控等
2)主备切换,把最小临时节点设为master角色,其他临时节点为salve角色
3)独占锁,若只监听一个固定临时节点,当该临时节点创建,则获得锁,否则释放锁
4)分布式锁,不同客户端创建不同临时顺序节点,链式监听节点是否删除事件

2、简单的wacher

  kazoo支持使用装饰器实现一个简单的wacher,kazoo有两种wacher,一个是监听子节点变化,另外一个是监听节点值的变化。监听子节点变化示例:

import time
from kazoo.client import KazooClient
from kazoo.client import DataWatch
from kazoo.client import ChildrenWatch


def watch_child_node(zk_path):
    zkc=KazooClient(hosts='192.168.100.5:2181',timeout=5)
    zkc.start(timeout=5)
    # 直接用装饰器完成监听
    @ChildrenWatch(client=zkc,path=zk_path,send_event=True)
    def get_changes_with_event(children,event):
        print ("Children nodes are %s" % children)
         if event:
            print("catched nodes a children nodes event ",event)
            
    @ChildrenWatch(client=zkc,path=zk_path)
    def get_changes_without_event(children):
        print ("Children are %s" % children)
        
    while True:
    	time.sleep(5)
    	print('watching children node changes.....')
    	
watch_child_node(/app_conf)

# 在zk上连续创建几个子节点,可以看到监听到变化
[zk: localhost:2181(CONNECTED) 2] create /app_conf/foo1 1
Created /app_conf/foo1
[zk: localhost:2181(CONNECTED) 3] create /app_conf/foo2 1
Created /app_conf/foo2
[zk: localhost:2181(CONNECTED) 4] create /app_conf/foo3 1

# 输出,捕捉到节点变化的事件,但zk不会给出这个事件的具体发生情况:子节点被删除的事件、子节点新增的事件
需要客户端根据事件的发生写一段逻辑去获取zk的节点到底是增加了还是减少了。
Children nodes are []
watching children nodes changes.....
watching children nodes changes.....
Children nodes are ['foo1']
catching a children nodes event  WatchedEvent(type='CHILD', state='CONNECTED', path='/app_conf')
watching children nodes changes.....
Children nodes are ['foo1', 'foo2']
catching a children nodes event  WatchedEvent(type='CHILD', state='CONNECTED', path='/app_conf')
watching children nodes changes.....
Children nodes are ['foo1', 'foo2', 'foo3']
catching a children nodes event  WatchedEvent(type='CHILD', state='CONNECTED', path='/app_conf')
Children nodes are ['foo1', 'foo2']
catching a children nodes event  WatchedEvent(type='CHILD', state='CONNECTED', path='/app_conf')

注意:在注册监听环节,可以监听当前节点本身是否删除的事件,以及子节点的增、删事件,若需要zk返回event,那么需要将send_event设为True,才可以在watch函数传入event位置参数,这个逻辑可以在kazoo的源码看到
if self._send_event
result = self._func(children, event)
else:
result = self._func(children)
装饰器有两种写法,一种是从引用kazoo import的ChildrenWatch,

@ChildrenWatch(client=zkc,path=zk_path,send_event=True)
def get_changes_with_event(children,event):
		pass

另外一种是从已创建的zk实例中调用ChildrenWatch

@zkc.ChildrenWatch(path=zk_path,send_event=True)
def get_changes_with_event(children,event):
		pass
2.1 监听节点自身的数据变化
import time
from kazoo.client import KazooClient
from kazoo.client import DataWatch
from kazoo.client import ChildrenWatch

def watch_data(zk_path):
    zkc = KazooClient(hosts='192.168.100.5:2181', timeout=5)
    zkc.start(timeout=5)

    # 直接用装饰器完成监听,节点值的监听还可以拿到zk的事件
    #使用@DataWatch(client=zkc,path=zk_path)或者两种写法都可以
    @zkc.DataWatch(path=zk_path)
    def my_watch(data, stat,event):
        if not data:
            pass

        print("Data is {0} and data type is {1}".format(data, type(data)))
        print("Version is %s" % stat.version)
        if event:
            print("catching a data event ",event)

    while True:
        time.sleep(5)
        print('watching current node data changes.....')

watch_data('/app_conf')
# 在zk 将/app_conf set不同的值,可以看到监听到数据变化
[zk: localhost:2181(CONNECTED) 20] set /app_conf foo
[zk: localhost:2181(CONNECTED) 19] set /app_conf 2
# 输出,注意kazoo返回的是bytes类型的数据,data变化的事件已经捕捉到
Data is b'foo' and data type is <class 'bytes'>
Version is 22
watching current node data changes.....
Data is b'bar' and data type is <class 'bytes'>
Version is 23
catching a data event  WatchedEvent(type='CHANGED', state='CONNECTED', path='/app_conf')

  节点数据的变化,很容易联想相关应用场景:
1)集中配置管理,各个客户端监听放置配置文件内容的节点,若配置有变化,则可以各个客户端拉取配置更新
2)消息队列:
  在特定节点下创建持久顺序节点,创建成功时Watcher通知等待的队列,队列删除序列号最小的节点用以消费。此场景下znode存储的数据就是消息队列中的消息内容,持久顺序节点就是消息的编号,按排序后取出最小编号节点(先get后delete)。由于创建的节点是持久化的,所以不必担心队列消息的丢失问题

2.2 封装一个同时监听子节点变化和当前节点数据变化的watcher类
import time
from kazoo.client import KazooClient
from kazoo.client import ChildrenWatch
from kazoo.client import DataWatch


class ZKWatcher(object):
    def __init__(self,host,port,timeout=5):

        self._old_node_list=[]
        self._node_name=''
        self._host=host
        self._port=port
        self._time_out=timeout
        self._ip_port=self._host+':'+self._port
        self._zkc=KazooClient(hosts=self._ip_port,timeout=self._time_out)
        self._zkc.start(self._time_out)

    def watcher(self,zk_path):
        # 获取原子节点列表
        self._old_node_list=self._zkc.get_children(zk_path)

        try:
            # 为所要监听的节点开启一个子节点监听器
            ChildrenWatch(client=self._zkc,path=zk_path,func=self._node_change,send_event=True)

            # 为所要监听的节点开启一个该节点值变化的监听器
            DataWatch(client=self._zkc,path=zk_path,func=self._data_change)

        except Exception as e:
            raise

    def _node_change(self,new_node_list,event):

        # 这里的new_node_list是指当前最新的子节点列表
        if not event:
            print('未有事件发生')
            return
		# 当前节点列表与上次拿到的节点列表相等,注意不是长度相等,是列表值和长度都要相等
        if new_node_list == self._old_node_list:
            print('子节点列表未发生变化')
            return
            
        if len(new_node_list)>len(self._old_node_list):
            for new_node in new_node_list:
                if new_node not in self._old_node_list:
                    print('监听到一个新的节点:%s'%str(new_node))
                    self._old_node_list=new_node_list

        else:
            for old_node in self._old_node_list:
                if old_node not in new_node_list:
                    print('监听到一个删除的节点:%s'%str(old_node))
                    self._old_node_list=new_node_list



    def _data_change(self,data,stat,event):
        if not data:
            print('节点已删除,无法获取数据')
            return
        if not event:
            print('未有事件发生')
        print('监听到数据变化')
        print('数据为',data)
        print('数据长度',stat.dataLength)
        print('数据版本号:',stat.version)
        print('子节点数据版本号',stat.cversion)
        print('子节点数量',stat.numChildren)
        print('事件',event)


def run():
    try:
        zk = ZKWatcher(host='192.168.100.5',port='2181')
        zk.watcher('/app_locker')

        while True:
            time.sleep(5)
            print('watching......')
    except Exception as e:
        print(e)


if __name__== "__main__":
    run()
 类似资料: