目前kazoo是连接zk的最新第三方库,最新更新时间为2019年1月,其他第三方连接zk的库都长时间未更新,所以推荐使用kazoo。前面有几篇文章都已经详细给出了zk的部署,接下来是zk最核心的地方,将zk的数据结构特性跟业务场景相结合,实现复杂需求,本文给出基本demo用法介绍。
基本操作,创建、更新、删除,kazoo接口已经足够简单,入参类型如果不懂,可以直接看源码,同时也有助于深入了解别人是如何构思python“中间件”
from kazoo import exceptions
from kazoo.client import KazooClient
from kazoo.client import ChildrenWatch
from kazoo.client import DataWatch
def normal_test(zk_path,host,port,node_list):
zk=KazooClient(hosts=host+':'+port,timeout=5)
zk.start(timeout=5)
if not zk.exists(zk_path):
print("node:{} does't exists".format(zk_path))
# 创建当前节点,持久性节点,值需要设为byte类型
zk.create(path=zk_path,value=b'bar')
# 这里是获取当前节点的子节点列表,可以设定watch以及是否返回节点数据
child_node_list=zk.get_children(zk_path,watch=None,include_data=False)
# 创建多个子节点,值可以设为一样,因为这里关注子节点是否存在,不关心其值
if not child_node_list:
for sub_node in node_list:
zk.create(zk_path + '/' + sub_node,b'1')
else:
print('subnode list:{}'.format(child_node_list))
# 获取当前节点的znode对象:含data和ZnodeStat对象
data,stat=zk.get(zk_path)
print('current node data:{}'.format(data))
print('data version:{}'.format(stat.version))
print('data length:{}'.format(stat.data_length))
print('children node numbers:{}'.format(stat.numChildren))
# 更新节点数据,可以指定值和版本,成功更新则ZnodeStat 对象
stat_new=zk.set(zk_path,value=b'foo')
print('node {0} is updated:{1}'.format(zk_path,stat_new))
# 删除当前节点,若当前节点有子节点,则提示无法删除,需要使用递归删除
zk.delete(zk_path,recursive=True)
try:
last=zk.get_children(zk_path)
print('children nodes :{}'.format(last))
except exceptions.NoNodeError:
print('no children nodes')
zk.stop()
zk.close()
if __name__=='__main__':
normal_test(zk_path='/app_conf',host='192.168.100.5',port='2181',node_list=['foo1','foo2','foo3'])
监控节点数量的变化,可以应用到相关的场景:
1)把节点名称设为服务器IP,可以实现服务器集群管理,服务(服务接口)上、下线通知等,又称服务发现、服务监控等
2)主备切换,把最小临时节点设为master角色,其他临时节点为salve角色
3)独占锁,若只监听一个固定临时节点,当该临时节点创建,则获得锁,否则释放锁
4)分布式锁,不同客户端创建不同临时顺序节点,链式监听节点是否删除事件
kazoo支持使用装饰器实现一个简单的wacher,kazoo有两种wacher,一个是监听子节点变化,另外一个是监听节点值的变化。监听子节点变化示例:
import time
from kazoo.client import KazooClient
from kazoo.client import DataWatch
from kazoo.client import ChildrenWatch
def watch_child_node(zk_path):
zkc=KazooClient(hosts='192.168.100.5:2181',timeout=5)
zkc.start(timeout=5)
# 直接用装饰器完成监听
@ChildrenWatch(client=zkc,path=zk_path,send_event=True)
def get_changes_with_event(children,event):
print ("Children nodes are %s" % children)
if event:
print("catched nodes a children nodes event ",event)
@ChildrenWatch(client=zkc,path=zk_path)
def get_changes_without_event(children):
print ("Children are %s" % children)
while True:
time.sleep(5)
print('watching children node changes.....')
watch_child_node(/app_conf)
# 在zk上连续创建几个子节点,可以看到监听到变化
[zk: localhost:2181(CONNECTED) 2] create /app_conf/foo1 1
Created /app_conf/foo1
[zk: localhost:2181(CONNECTED) 3] create /app_conf/foo2 1
Created /app_conf/foo2
[zk: localhost:2181(CONNECTED) 4] create /app_conf/foo3 1
# 输出,捕捉到节点变化的事件,但zk不会给出这个事件的具体发生情况:子节点被删除的事件、子节点新增的事件
需要客户端根据事件的发生写一段逻辑去获取zk的节点到底是增加了还是减少了。
Children nodes are []
watching children nodes changes.....
watching children nodes changes.....
Children nodes are ['foo1']
catching a children nodes event WatchedEvent(type='CHILD', state='CONNECTED', path='/app_conf')
watching children nodes changes.....
Children nodes are ['foo1', 'foo2']
catching a children nodes event WatchedEvent(type='CHILD', state='CONNECTED', path='/app_conf')
watching children nodes changes.....
Children nodes are ['foo1', 'foo2', 'foo3']
catching a children nodes event WatchedEvent(type='CHILD', state='CONNECTED', path='/app_conf')
Children nodes are ['foo1', 'foo2']
catching a children nodes event WatchedEvent(type='CHILD', state='CONNECTED', path='/app_conf')
注意:在注册监听环节,可以监听当前节点本身是否删除的事件,以及子节点的增、删事件,若需要zk返回event,那么需要将send_event设为True,才可以在watch函数传入event位置参数,这个逻辑可以在kazoo的源码看到
if self._send_event
result = self._func(children, event)
else:
result = self._func(children)
装饰器有两种写法,一种是从引用kazoo import的ChildrenWatch,
@ChildrenWatch(client=zkc,path=zk_path,send_event=True)
def get_changes_with_event(children,event):
pass
另外一种是从已创建的zk实例中调用ChildrenWatch
@zkc.ChildrenWatch(path=zk_path,send_event=True)
def get_changes_with_event(children,event):
pass
import time
from kazoo.client import KazooClient
from kazoo.client import DataWatch
from kazoo.client import ChildrenWatch
def watch_data(zk_path):
zkc = KazooClient(hosts='192.168.100.5:2181', timeout=5)
zkc.start(timeout=5)
# 直接用装饰器完成监听,节点值的监听还可以拿到zk的事件
#使用@DataWatch(client=zkc,path=zk_path)或者两种写法都可以
@zkc.DataWatch(path=zk_path)
def my_watch(data, stat,event):
if not data:
pass
print("Data is {0} and data type is {1}".format(data, type(data)))
print("Version is %s" % stat.version)
if event:
print("catching a data event ",event)
while True:
time.sleep(5)
print('watching current node data changes.....')
watch_data('/app_conf')
# 在zk 将/app_conf set不同的值,可以看到监听到数据变化
[zk: localhost:2181(CONNECTED) 20] set /app_conf foo
[zk: localhost:2181(CONNECTED) 19] set /app_conf 2
# 输出,注意kazoo返回的是bytes类型的数据,data变化的事件已经捕捉到
Data is b'foo' and data type is <class 'bytes'>
Version is 22
watching current node data changes.....
Data is b'bar' and data type is <class 'bytes'>
Version is 23
catching a data event WatchedEvent(type='CHANGED', state='CONNECTED', path='/app_conf')
节点数据的变化,很容易联想相关应用场景:
1)集中配置管理,各个客户端监听放置配置文件内容的节点,若配置有变化,则可以各个客户端拉取配置更新
2)消息队列:
在特定节点下创建持久顺序节点,创建成功时Watcher通知等待的队列,队列删除序列号最小的节点用以消费。此场景下znode存储的数据就是消息队列中的消息内容,持久顺序节点就是消息的编号,按排序后取出最小编号节点(先get后delete)。由于创建的节点是持久化的,所以不必担心队列消息的丢失问题
import time
from kazoo.client import KazooClient
from kazoo.client import ChildrenWatch
from kazoo.client import DataWatch
class ZKWatcher(object):
def __init__(self,host,port,timeout=5):
self._old_node_list=[]
self._node_name=''
self._host=host
self._port=port
self._time_out=timeout
self._ip_port=self._host+':'+self._port
self._zkc=KazooClient(hosts=self._ip_port,timeout=self._time_out)
self._zkc.start(self._time_out)
def watcher(self,zk_path):
# 获取原子节点列表
self._old_node_list=self._zkc.get_children(zk_path)
try:
# 为所要监听的节点开启一个子节点监听器
ChildrenWatch(client=self._zkc,path=zk_path,func=self._node_change,send_event=True)
# 为所要监听的节点开启一个该节点值变化的监听器
DataWatch(client=self._zkc,path=zk_path,func=self._data_change)
except Exception as e:
raise
def _node_change(self,new_node_list,event):
# 这里的new_node_list是指当前最新的子节点列表
if not event:
print('未有事件发生')
return
# 当前节点列表与上次拿到的节点列表相等,注意不是长度相等,是列表值和长度都要相等
if new_node_list == self._old_node_list:
print('子节点列表未发生变化')
return
if len(new_node_list)>len(self._old_node_list):
for new_node in new_node_list:
if new_node not in self._old_node_list:
print('监听到一个新的节点:%s'%str(new_node))
self._old_node_list=new_node_list
else:
for old_node in self._old_node_list:
if old_node not in new_node_list:
print('监听到一个删除的节点:%s'%str(old_node))
self._old_node_list=new_node_list
def _data_change(self,data,stat,event):
if not data:
print('节点已删除,无法获取数据')
return
if not event:
print('未有事件发生')
print('监听到数据变化')
print('数据为',data)
print('数据长度',stat.dataLength)
print('数据版本号:',stat.version)
print('子节点数据版本号',stat.cversion)
print('子节点数量',stat.numChildren)
print('事件',event)
def run():
try:
zk = ZKWatcher(host='192.168.100.5',port='2181')
zk.watcher('/app_locker')
while True:
time.sleep(5)
print('watching......')
except Exception as e:
print(e)
if __name__== "__main__":
run()